0

我正在使用 confluent-3.0.1 平台并构建 Kafka-Elasticsearch 连接器。为此,我正在扩展 SinkConnector 和 SinkTask(Kafka 连接 API)以从 Kafka 获取数据。

作为此代码的一部分,我正在扩展 SinkConnector 的 taskConfigs 方法以返回“max.poll.records”以一次仅获取 100 条记录。但它不起作用,我同时获得所有记录,我未能在规定的时间内提交偏移量。请任何人帮我配置“max.poll.records”

 public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
    for (int i = 0; i < maxTasks; i++) {
      Map<String, String> config = new HashMap<String, String>();
      config.put(ConfigurationConstants.CLUSTER_NAME, clusterName);
      config.put(ConfigurationConstants.HOSTS, hosts);
      config.put(ConfigurationConstants.BULK_SIZE, bulkSize);
      config.put(ConfigurationConstants.IDS, elasticSearchIds);
      config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics);
      config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish);
      config.put(ConfigurationConstants.TYPES, elasticSearchTypes);
      config.put("max.poll.records", "100");

      configs.add(config);
    }
    return configs;
  }
4

2 回答 2

8

max.poll.records您不能像在连接器配置中那样覆盖大多数 Kafka 消费者配置。不过,您可以在 Connect 工作程序配置中使用consumer.前缀来执行此操作。

于 2016-11-10T19:39:36.157 回答
2

解决了。我在 connect-avro-standalone.properties 中添加了以下配置

 group.id=mygroup
 consumer.max.poll.records=1000

并运行下面的命令来运行我的连接器。

sh ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-elasticsearch/connect-elasticsearch-sink.properties
于 2016-11-14T06:48:47.347 回答