谢谢你的耐心。
- 在线为某个主题添加分区后,kafka 消费者停止读取消息,不抛出异常。消费者只是阻止。每次我们都要重启消费者。我认为这是不合理的,我找不到任何关于它的文档。
此外,当处理消息发生错误时,消费者线程将不会恢复。我们的消费者读取消息并将其插入 MySql。一旦网络出现故障,消费者无法连接到 MySql,然后它会阻塞并停止读取消息,直到我们重新启动它。
- 添加分区时,旧数据和新数据会发生什么变化?文档(<a href="https://kafka.apache.org/documentation.html#basic_ops_modify_topic" rel="nofollow">https://kafka.apache.org/documentation.html#basic_ops_modify_topic)说:
“请注意,分区的一个用例是对数据进行语义分区,并且添加分区不会更改现有数据的分区,因此如果消费者依赖该分区,这可能会打扰消费者。也就是说,如果数据按哈希(键)分区% number_of_partitions 那么这个分区可能会通过添加分区而被打乱,但 Kafka 不会尝试以任何方式自动重新分配数据。”
“不尝试自动重新分配数据”是什么意思?旧数据不变,新数据不会发送到添加的分区?
- 当代理关闭时,kafka 生产者无法发送消息。
我们有一个包含 3 个分区和 2 个副本的主题。kafka 集群有 3 个代理。但是当一个broker宕机时,就会出现异常:
kafka.producer.async.ProducerSendThread.error():103: - Error in handling batch of 65 events
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) ~[kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) [kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) [kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) [kafka_2.9.2-0.8.2.0.jar:na]
at scala.collection.immutable.Stream.foreach(Stream.scala:526) [scala-library-2.9.2.jar:na]
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) [kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [kafka_2.9.2-0.8.2.0.jar:na]
kafka.producer.async.DefaultEventHandler.error():97: - Failed to send requests for topics risk_acts with correlation ids in [433266,433395]
添加新代理时也会出现同样的问题。我们必须在生产者的“metadata.broker.list”配置中添加新的代理主机名和端口并重新启动它。
我们使用的是高级api,kafka的版本是:
<dependency>
<groupId> org.apache.kafka</groupId >
<artifactId> kafka_2.9.2</artifactId >
<version> 0.8.2.0</version >
</dependency>
生产者配置:
<entry key="metadata.broker.list" value="${metadata.broker.list}" />
<entry key="serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="key.serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="request.required.acks" value="-1" />
<entry key="producer.type" value="async" />
<entry key="queue.enqueue.timeout.ms" value="-1" />
<entry key="compression.codec" value="1" />
消费者配置:
<entry key="zookeeper.connect" value="${zookeeper.connect}" />
<entry key="group.id" value="${kafka.consumer.group.id}" />
<entry key="zookeeper.session.timeout.ms" value="40000" />
<entry key="rebalance.backoff.ms" value="10000" />
<entry key="zookeeper.sync.time.ms" value="2000" />
<entry key="auto.commit.interval.ms" value="1000" />
<entry key="auto.offset.reset" value="smallest" />
生产者代码和消费者代码如下: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group +示例