elasticsearch java批量大小

我想使用java的elasticsearch bulk api,并想知道如何设置批量大小。

目前我正在使用它:

BulkRequestBuilder bulkRequest = getClient().prepareBulk(); while(hasMore) { bulkRequest.add(getClient().prepareIndex(indexName, indexType, artist.getDocId()).setSource(json)); hasMore = checkHasMore(); } BulkResponse bResp = bulkRequest.execute().actionGet(); //To check failures log.info("Has failures? {}", bResp.hasFailures()); 

知道如何设置批量/批量大小吗?

它主要取决于文档的大小,客户端上的可用资源以及客户端的类型(传输客户端或节点客户端)。

节点客户端知道群集上的分片,并将文档直接发送到保存应该被索引的分片的节点。 另一方面,传输客户端是普通客户端,它以循环方式将其请求发送到节点列表。 然后,批量请求将被发送到一个节点,这将成为索引时的网关。

由于您使用的是Java API,我建议您查看BulkProcessor ,这样可以更轻松,更灵活地批量索引。 您可以定义自上次批量执行以来的最大操作数,最大大小和最大时间间隔。 它会在需要时自动为您执行批量处理。 您还可以设置最大并发批量请求数。

在您创建BulkProcessor之后:

 BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { logger.info("Going to execute new bulk composed of {} actions", request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { logger.info("Executed bulk composed of {} actions", request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.warn("Error executing bulk", failure); } }).setBulkActions(bulkSize).setConcurrentRequests(maxConcurrentBulk).build(); 

您只需要向其添加请求:

 bulkProcessor.add(indexRequest); 

并在最后关闭它以刷新可能尚未执行的任何最终请求:

 bulkProcessor.close(); 

最后回答你的问题:关于BulkProcessor还在于它具有合理的默认值:5 MB大小,1000个动作,1个并发请求,没有刷新间隔(可能对设置有用)。

您需要在批量请求构建器达到批量大小限制时对其进行计数,然后对其进行索引并刷新旧的批量构建。 这是代码的例子

 Settings settings = ImmutableSettings.settingsBuilder() .put("cluster.name", "MyClusterName").build(); TransportClient client = new TransportClient(settings); String hostname = "myhost ip"; int port = 9300; client.addTransportAddress(new InetSocketTransportAddress(hostname, port)); BulkRequestBuilder bulkBuilder = client.prepareBulk(); BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path")))); long bulkBuilderLength = 0; String readLine = ""; String index = "my_index_name"; String type = "my_type_name"; String id = ""; while((readLine = br.readLine()) != null){ id = somefunction(readLine); String json = new ObjectMapper().writeValueAsString(readLine); bulkBuilder.add(client.prepareIndex(index, type, id).setSource(json)); bulkBuilderLength++; if(bulkBuilderLength % 1000== 0){ logger.info("##### " + bulkBuilderLength + " data indexed."); BulkResponse bulkRes = bulkBuilder.execute().actionGet(); if(bulkRes.hasFailures()){ logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); } bulkBuilder = client.prepareBulk(); } } br.close(); if(bulkBuilder.numberOfActions() > 0){ logger.info("##### " + bulkBuilderLength + " data indexed."); BulkResponse bulkRes = bulkBuilder.execute().actionGet(); if(bulkRes.hasFailures()){ logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); } bulkBuilder = client.prepareBulk(); } 

希望这能帮到你谢谢