0

开启 EOS (KS 1.1.0) 在干净的本地环境中,所有主题都有 2 个分区。使用 ENABLE_IDEMPOTENCE_CONFIG=true 产生少量消息

重启应用后,全局状态存储恢复进入无限循环。

实现 StateRestoreListener 并将日志添加到 onRestoreStart、onRestoreEnd、onBatchRestored

这是我在日志中反复看到的内容:(主题名称为 bu)

DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,590 | org.apache.kafka.clients.NetworkClient | [Consumer clientId=AppName-global-restore-consumer, groupId=] Sending metadata request (type=MetadataRequest, topics=bu) to node jonathan:9092 (id: 0 rack: null)
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,591 | org.apache.kafka.clients.Metadata | Updated cluster metadata version 3 to Cluster(id = Kw31wtS9TYm__QEmSUDgdg, nodes = [jonathan:9092 (id: 0 rack: null)], partitions = [Partition(topic = bu, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = []), Partition(topic = bu, partition = 1, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])])
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,592 | org.apache.kafka.clients.consumer.internals.Fetcher | [Consumer clientId=AppName-global-restore-consumer, groupId=] Sending ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, partitionTimestamps={bu-1=-1, bu-0=-1}, isolationLevel=READ_COMMITTED) to broker jonathan:9092 (id: 0 rack: null)
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,595 | org.apache.kafka.clients.consumer.internals.Fetcher | [Consumer clientId=AppName-global-restore-consumer, groupId=] Handling ListOffsetResponse response for bu-1. Fetched offset 12, timestamp -1
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,596 | org.apache.kafka.clients.consumer.internals.Fetcher | [Consumer clientId=AppName-global-restore-consumer, groupId=] Handling ListOffsetResponse response for bu-0. Fetched offset 16, timestamp -1
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,596 | org.apache.kafka.clients.consumer.KafkaConsumer | [Consumer clientId=AppName-global-restore-consumer, groupId=] Subscribed to partition(s): bu-1
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,596 | org.apache.kafka.clients.consumer.KafkaConsumer | [Consumer clientId=AppName-global-restore-consumer, groupId=] Seeking to offset 9 for partition bu-1


{"thread":"AppName-GlobalStreamThread","message":"[RESTORE:START] bu bu-1, startingOffset:9","endOfBatch":false}


DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,640 | org.apache.kafka.common.metrics.Metrics | Added sensor with name topic.bu.bytes-fetched
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,642 | org.apache.kafka.common.metrics.Metrics | Added sensor with name topic.bu.records-fetched
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,642 | org.apache.kafka.common.metrics.Metrics | Added sensor with name bu-1.records-lag
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,643 | org.apache.kafka.clients.consumer.internals.Fetcher | [Consumer clientId=AppName-global-restore-consumer, groupId=] Added READ_COMMITTED fetch request for partition bu-1 at offset 12 to node jonathan:9092 (id: 0 rack: null)
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,643 | org.apache.kafka.clients.FetchSessionHandler | [Consumer clientId=AppName-global-restore-consumer, groupId=] Built incremental fetch (sessionId=1036463678, epoch=1) for node 0. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,643 | org.apache.kafka.clients.consumer.internals.Fetcher | [Consumer clientId=AppName-global-restore-consumer, groupId=] Sending READ_COMMITTED IncrementalFetchRequest(toSend=(bu-1), toForget=(), implied=()) to broker jonathan:9092 (id: 0 rack: null)

{"thread":"AppName-GlobalStreamThread","message":"[RESTORE:BATCH] bu bu-1, batchEndOffset:12","endOfBatch":false}
{"thread":"AppName-GlobalStreamThread","message":"[RESTORE:END] bu bu-1, totalRestored:1","endOfBatch":false}


DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,649 | org.apache.kafka.clients.consumer.KafkaConsumer | [Consumer clientId=AppName-global-restore-consumer, groupId=] Subscribed to partition(s): bu-0
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,649 | org.apache.kafka.common.metrics.Metrics | Removed sensor with name bu-1.records-lag
DEBUG | AppName-GlobalStreamThread | 2018-06-17 20:07:21,649 | org.apache.kafka.clients.consumer.KafkaConsumer | [Consumer clientId=AppName-global-restore-consumer, groupId=] Seeking to offset 15 for partition bu-0

{"thread":"AppName-GlobalStreamThread","message":"[RESTORE:START] bu bu-0, startingOffset:15","endOfBatch":false}
{"thread":"AppName-GlobalStreamThread","message":"[RESTORE:BATCH] bu bu-0, batchEndOffset:15","endOfBatch":false}
{"thread":"AppName-GlobalStreamThread","message":"[RESTORE:BATCH] bu bu-0, batchEndOffset:15","endOfBatch":false}
{"thread":"AppName-GlobalStreamThread","message":"[RESTORE:BATCH] bu bu-0, batchEndOffset:15","endOfBatch":false}
...
...

反复我找不到与 EOS 和全局状态存储相关的已知问题,有人看过吗?

4

1 回答 1

1

发现这个相关问题: http: //mail-archives.apache.org/mod_mbox/kafka-users/201711.mbox/%3CCAG53Ff=H8e_DeAKq7BOz_LdMtf2wD_SETM9PBonwZyBNJ2HZ3w@mail.gmail.com%3E

目前,我没有为将记录生成到支持全局表的主题的应用程序启用 EOS。这解决了全局表还原时的无限循环。

于 2018-06-18T06:20:20.710 回答