我在这里使用Kafka 消费者(版本 1.3.1)。
我要实现的目标:
有10个分区。每个分区都从偏移量 0 开始。
有一组消费者(例如,1、2、3)。
有时,一个消费者下降或上升。
因此,小组成员可能会发生变化。但是我希望每个分区中的每条消息都应该只被组消费一次(1 OR 2 OR 3)。
我的代码是:
consumer = KafkaConsumer('my_topic',
bootstrap_servers=['ip:9092'],
auto_offset_reset='earliest',
max_partition_fetch_bytes=131072,
group_id='writer.test')
以上配置够吗?欢迎任何意见。谢谢
更新
我尝试了以下代码。每次在分区 760 中,每条消息可能被一组中的两个消费者消费两次。为什么?有问题?
def test():
#PULL FROM KAFKA
consumer = KafkaConsumer(
'topic',
bootstrap_servers=[ip],
auto_offset_reset='latest',
max_partition_fetch_bytes=131072,
auto_commit_interval_ms=500,
group_id='writer2.test')
print consumer.poll()
for i in range(10000):
msg = next(consumer)
if str(msg[1])=='670':
print 'partition= %s, offset= %s' % (msg[1], msg[2])
consumer.unsubscribe()
if __name__ == "__main__":
for i in range(10):
import time
time.sleep(5)
test()
输出 1:
{}
partition= 670, offset= 224
partition= 670, offset= 225
partition= 670, offset= 226
partition= 670, offset= 227
partition= 670, offset= 228
partition= 670, offset= 229
partition= 670, offset= 230
partition= 670, offset= 231
partition= 670, offset= 232
partition= 670, offset= 233
partition= 670, offset= 234
partition= 670, offset= 235
partition= 670, offset= 236
partition= 670, offset= 237
partition= 670, offset= 238
partition= 670, offset= 239
partition= 670, offset= 240
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
在另一个窗口中运行相同的文件,输出:
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259
{}
partition= 670, offset= 241
partition= 670, offset= 242
partition= 670, offset= 243
partition= 670, offset= 244
partition= 670, offset= 245
partition= 670, offset= 246
partition= 670, offset= 247
partition= 670, offset= 248
partition= 670, offset= 249
partition= 670, offset= 250
partition= 670, offset= 251
partition= 670, offset= 252
partition= 670, offset= 253
partition= 670, offset= 254
partition= 670, offset= 255
partition= 670, offset= 256
partition= 670, offset= 257
partition= 670, offset= 258
partition= 670, offset= 259