如何通过Java API重新索引ElasticSearch

喜欢标题说……

我读了这篇文章( https://www.elastic.co/blog/changing-mapping-with-zero-downtime ),这个概念很棒,但我很难找到关于如何通过JAVA API做到这一点的合适参考。

我找到了这个插件: https : //github.com/karussell/elasticsearch-reindex ,但看起来像我试图做的有点过分

经过当地星巴克的一些研究,我想到了:

让我们假设我们已经有了索引(“old_index”)并且它有数据……现在让我们将数据移动到我们创建的新索引(“new_index”)(对于某个字段,可能使用不同的模式STRING vs INT,或者现在您决定不再希望分析或存储某些字段等)。

这里的基本思想是从已存在的索引(“old_index”)中检索所有数据并将其摄取到新索引(“new_index”)中。 但是,您必须做的事情很少:

步骤1.您需要执行搜索滚动https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html

它所做的一切都比常规搜索更有效地检索结果。 没有评分等。以下是文档所说的内容:“滚动不是针对实时用户请求,而是针对处理大量数据,例如为了将一个索引的内容重新索引到新索引中配置不同。“

以下是Java API如何使用它的链接: https : //www.elastic.co/guide/en/elasticsearch/client/java-api/current/scrolling.html

步骤2.进行插入时,必须使用批量摄取。 再一次,它是出于性能原因而完成的。 以下是Bulk Ingest Java API的链接: https : //www.elastic.co/guide/en/elasticsearch/client/java-api/current/bulk.html#_using_bulk_processor

现在去实际做它…

步骤1.设置滚动搜索,从旧索引“加载”数据

SearchResponse scrollResp = client.prepareSearch("old_index") // Specify index .setSearchType(SearchType.SCAN) .setScroll(new TimeValue(60000)) .setQuery(QueryBuilders.matchAllQuery()) // Match all query .setSize(100).execute().actionGet(); //100 hits per shard will be returned for each scroll 

步骤2.设置批量处理器。

 int BULK_ACTIONS_THRESHOLD = 1000; int BULK_CONCURRENT_REQUESTS = 1; BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { logger.info("Bulk Going to execute new bulk composed of {} actions", request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { logger.info("Executed bulk composed of {} actions", request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.warn("Error executing bulk", failure); } }).setBulkActions(BULK_ACTIONS_THRESHOLD).setConcurrentRequests(BULK_CONCURRENT_REQUESTS).setFlushInterval(TimeValue.timeValueMillis(5)).build(); 

步骤3.在步骤1中通过创建的滚动搜索器从旧索引读取,直到剩下mo记录并插入新索引

 //Scroll until no hits are returned while (true) { scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); //Break condition: No hits are returned if (scrollResp.getHits().getHits().length == 0) { logger.info("Closing the bulk processor"); bulkProcessor.close(); break; } // Get results from a scan search and add it to bulk ingest for (SearchHit hit: scrollResp.getHits()) { IndexRequest request = new IndexRequest("new_index", hit.type(), hit.id()); Map source = ((Map) ((Map) hit.getSource())); request.source(source); bulkProcessor.add(request); } } 

步骤4.现在是时候将指向旧索引的现有别名分配给新索引。 然后删除旧索引的别名引用,然后删除旧索引本身。 要了解如何确定分配给现有旧索引的别名,请参阅以下文章: ElasticSeach JAVA API以查找给定索引的别名

为新索引分配别名

 client.admin().indices().prepareAliases().addAlias("new_index", "alias_name").get(); 

从旧索引中删除别名,然后删除旧索引

 client.admin().indices().prepareAliases().removeAlias("old_index", "alias_name").execute().actionGet(); client.admin().indices().prepareDelete("old_index").execute().actionGet(); 

从ES 2.0开始,您可以使用reindex API。 由于没有关于如何使用Java API执行此操作的文档,因此步骤如下:

  1. 根据您的ES版本添加Maven依赖项
  2. 将插件添加到您的客户端:

client = TransportClient.builder().settings(elaSettings).addPlugin(ReindexPlugin.class).build();

  1. 调用reindex api

ReindexRequestBuilder builder = ReindexAction.INSTANCE.newRequestBuilder(client).source(oldIndex).destination(newIndex); builder.destination().setOpType(opType); builder.abortOnVersionConflict(false); builder.get();

如果使用Jest,您可以使用Reindex.Builder(io.searchbox.indices.reindex.Reindex)。 截至本文最新的Jest 5.3.2有它。