0

我在这里使用Kafka 消费者(版本 1.3.1)。

我要实现的目标:

  • 有10个分区。每个分区都从偏移量 0 开始。

  • 有一组消费者(例如,1、2、3)。

  • 有时,一个消费者下降或上升。

  • 因此,小组成员可能会发生变化。但是我希望每个分区中的每条消息都应该只被组消费一次(1 OR 2 OR 3)。

我的代码是:

consumer = KafkaConsumer('my_topic',
            bootstrap_servers=['ip:9092'],
            auto_offset_reset='earliest',
            max_partition_fetch_bytes=131072,
            group_id='writer.test')

以上配置够吗?欢迎任何意见。谢谢

更新

我尝试了以下代码。每次在分区 760 中,每条消息可能被一组中的两个消费者消费两次。为什么?有问题?

def test():
    #PULL FROM KAFKA
    consumer = KafkaConsumer(
            'topic',
            bootstrap_servers=[ip],
            auto_offset_reset='latest',
            max_partition_fetch_bytes=131072,
            auto_commit_interval_ms=500,
            group_id='writer2.test')

    print consumer.poll()
    for i in range(10000):
        msg = next(consumer)
        if str(msg[1])=='670':
            print 'partition= %s, offset= %s' % (msg[1], msg[2])
    consumer.unsubscribe()


if __name__ == "__main__":
    for i in range(10):
        import time
        time.sleep(5)
        test()

输出 1:

{}
partition= 670, offset= 224
partition= 670, offset= 225
partition= 670, offset= 226
partition= 670, offset= 227
partition= 670, offset= 228
partition= 670, offset= 229
partition= 670, offset= 230
partition= 670, offset= 231
partition= 670, offset= 232
partition= 670, offset= 233
partition= 670, offset= 234
partition= 670, offset= 235
partition= 670, offset= 236
partition= 670, offset= 237
partition= 670, offset= 238
partition= 670, offset= 239
partition= 670, offset= 240
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259

在另一个窗口中运行相同的文件,输出:

{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
4

1 回答 1

2

如果您使用消费者组,Kafka 提供至少一次传递保证,因此,在消费者失败时,重新分配这些消费者的分区,一些消息可能会被第二次传递。

如果您想确保没有消息被处理两次,您可以将模式切换为最多一次交付保证。但是,为此,您可能会在失败的情况下丢失一些消息(即,从未处理过)。

要启用最多一次,您需要禁用自动提交,并在 之后直接手动提交poll,即在您开始处理通过 接收的消息之前poll

有关更多详细信息,请参阅http://docs.confluent.io/3.0.0/clients/consumer.html#detailed-examples(即使示例不在 Python 中,一般模式也是相同的)。

于 2016-08-31T11:08:51.543 回答