8

我正在尝试使用<KStream>.process()aTimeWindows.of("name", 30000)来批量处理一些KTable值并将它们发送出去。似乎 30 秒超过了消费者超时间隔,之后 Kafka 认为所述消费者已失效并释放分区。

我尝试提高轮询频率和提交间隔以避免这种情况:

config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");

不幸的是,这些错误仍在发生:

(很多)

ERROR  o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog 
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0

其次是这些:

INFO   o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN   o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]: 
  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)

显然,我需要更频繁地将心跳发送回服务器。如何?

我的拓扑是:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String>  kt = lines.aggregateByKey(
            new DBAggregateInit(),
            new DBAggregate(),
            TimeWindows.of("write_aggregate2", 30000));

DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();

kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

KTable每 30 秒按键对值进行分组。在Processor.init()我调用context.schedule(30000).

DBProcessorSupplier提供了一个DBProcessor的实例。这是AbstractProcessor的一个实现,其中已经提供了所有覆盖。他们所做的只是记录,所以我知道每个人什么时候被击中。

这是一个非常简单的拓扑,但很明显我在某处遗漏了一步。


编辑:

我知道我可以在服务器端进行调整,但我希望有一个客户端解决方案。我喜欢当客户端退出/死亡时很快就可以使用分区的概念。


编辑:

为了简化问题,我从图中删除了聚合步骤。现在只是消费者->处理器()。(如果我将消费者直接发送给.print()它会很快工作,所以我知道没关系)。(同样,如果我通过它输出聚合(KTable).print()似乎也可以)。

我发现应该每 30 秒调用一次的.process()-实际上阻塞了可变时间长度并且输出有点随机(如果有的话)。.punctuate()

更远:

我将调试级别设置为“调试”并重新运行。我看到很多消息:

DEBUG  o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>

但是.punctuate()函数中的断点没有被命中。所以它做了很多工作,但没有给我使用它的机会。

4

1 回答 1

10

一些澄清:

  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG是提交间隔的下限,即,在提交之后,下一次提交不会在此时间过去之前发生。基本上,Kafka Stream 会在这段时间过后尝试尽快提交,但无法保证下一次提交实际需要多长时间。
  • StreamsConfig.POLL_MS_CONFIG用于内部KafkaConsumer#poll()呼叫,指定呼叫的最大阻塞时间poll()

因此,这两个值都对心跳频率没有帮助。

Kafka Streams 在处理记录时遵循“深度优先”的策略。这意味着,poll()对于每条记录,拓扑的所有运算符都将被执行。假设您有三个连续的地图,在处理下一条/第二条记录之前,将为第一条记录调用所有三个地图。

因此,在第一个记录得到完全处理poll()之后,将进行下一个调用。poll()如果您想更频繁地检测心跳,则需要确保单个poll()调用获取的记录更少,这样处理所有记录所需的时间更短,并且下一个调用poll()会更早触发。

KafkaConsumer您可以使用可以通过指定的配置参数StreamsConfig来完成此操作(请参阅https://kafka.apache.org/documentation.html#consumerconfigs):

streamConfig.put(ConsumerConfig.XXX, VALUE);

  • max.poll.records:如果你减少这个值,更少的记录将被轮询
  • session.timeout.ms:如果你增加这个值,就有更多的时间来处理数据(添加这个是为了完整性,因为它实际上是一个客户端设置而不是一个服务器/代理端配置——即使你知道这个解决方案并且不喜欢它: ))

编辑

从 Kafka0.10.1开始,可以(并且推荐)在流配置中为消费者和生产者配置添加前缀。这避免了参数冲突,因为某些参数名称用于消费者和生产者,否则无法区分(并且将同时应用于消费者生产者)。要为参数添加前缀,您可以分别使用StreamsConfig#consumerPrefix()StreamsConfig#producerPrefix()。例如: streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);

还要补充一点:这个问题中描述的场景是一个已知问题,并且已经有KIP-62引入了一个用于KafkaConsumer发送心跳的后台线程,从而将心跳与poll()调用分离。Kafka Streams 将在即将发布的版本中利用这一新功能。

于 2016-08-30T21:06:06.103 回答