5

我想使用 java 的 elasticsearch bulk api 并想知道如何设置批量大小。

目前我将其用作:

BulkRequestBuilder bulkRequest = getClient().prepareBulk();
while(hasMore) {
    bulkRequest.add(getClient().prepareIndex(indexName, indexType, artist.getDocId()).setSource(json));
    hasMore = checkHasMore();
}
BulkResponse bResp = bulkRequest.execute().actionGet();
//To check failures
log.info("Has failures? {}", bResp.hasFailures());

知道如何设置批量/批量大小吗?

4

2 回答 2

23

它主要取决于您的文档的大小、客户端上的可用资源以及客户端的类型(传输客户端或节点客户端)。

节点客户端知道集群上的分片,并将文档直接发送到保存分片的节点,这些分片应该被索引。另一方面,传输客户端是一个普通客户端,它以循环方式将其请求发送到节点列表。然后批量请求将被发送到一个节点,该节点将成为索引时的网关。

由于您使用的是 Java API,我建议您看一下BulkProcessor,这使得批量索引变得更加容易和灵活。您可以定义自上次批量执行以来的最大操作数、最大大小和最大时间间隔。它会在需要时自动为您执行批量操作。您还可以设置最大并发批量请求数。

在你创建BulkProcessor这样的之后:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        logger.info("Going to execute new bulk composed of {} actions", request.numberOfActions());
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        logger.info("Executed bulk composed of {} actions", request.numberOfActions());
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        logger.warn("Error executing bulk", failure);
    }
    }).setBulkActions(bulkSize).setConcurrentRequests(maxConcurrentBulk).build();

您只需向其中添加您的请求:

bulkProcessor.add(indexRequest);

并在最后关闭它以刷新可能尚未执行的任何最终请求:

bulkProcessor.close();

最后回答您的问题:关于BulkProcessor它的好处还在于它具有合理的默认值:5 MB 大小、1000 个操作、1 个并发请求、无刷新间隔(这可能对设置有用)。

于 2013-06-04T20:36:03.583 回答
0

您需要在批量请求构建器达到批量大小限制时对其进行计数,然后对它们进行索引并刷新较旧的批量构建。这是代码示例

Settings settings = ImmutableSettings.settingsBuilder()
   .put("cluster.name", "MyClusterName").build();

TransportClient client = new TransportClient(settings);
String hostname = "myhost ip";
int port = 9300; 
client.addTransportAddress(new InetSocketTransportAddress(hostname, port));

BulkRequestBuilder bulkBuilder = client.prepareBulk();
BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path"))));
long bulkBuilderLength = 0;
String readLine = "";
String index = "my_index_name";
String type = "my_type_name";
String id = "";

while((readLine = br.readLine()) != null){
   id = somefunction(readLine);
   String json = new ObjectMapper().writeValueAsString(readLine);
   bulkBuilder.add(client.prepareIndex(index, type, id).setSource(json));
   bulkBuilderLength++;
   if(bulkBuilderLength % 1000== 0){
      logger.info("##### " + bulkBuilderLength + " data indexed.");
      BulkResponse bulkRes = bulkBuilder.execute().actionGet();
      if(bulkRes.hasFailures()){
         logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
      }
      bulkBuilder = client.prepareBulk();
   }
}

br.close();

if(bulkBuilder.numberOfActions() > 0){
   logger.info("##### " + bulkBuilderLength + " data indexed.");
   BulkResponse bulkRes = bulkBuilder.execute().actionGet();
   if(bulkRes.hasFailures()){
      logger.error("##### Bulk Request failure with error: " +   bulkRes.buildFailureMessage());
   }
   bulkBuilder = client.prepareBulk();
}

希望这可以帮助你谢谢

于 2014-11-20T10:02:26.557 回答