AFAIK,
kafka 中引入了分区和(消费者)组的概念来实现并行。我正在通过python使用kafka。我有一个特定的主题,它有(比如说)2 个分区。这意味着,如果我启动一个包含 2 个消费者的消费者组,他们将被映射(订阅)到不同的分区。
但是,kafka
在 python 中使用库时,我遇到了一个奇怪的问题。我启动了 2 个具有基本相同组 ID 的消费者,并启动了线程让他们消费消息。
但是,kafka-stream 中的每条消息都被他们俩消费了!!这对我来说似乎很荒谬,甚至在概念上也不正确。无论如何我可以手动将消费者映射到某些(不同的)分区(如果它们没有自动映射到不同的分区)?
这是代码:
from kafka import KafkaConsumer
import thread
def con1(consumer):
for msg in consumer:
print msg
consumer1 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])
consumer2 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])
thread.start_new_thread(con1, (consumer1,))
thread.start_new_thread(con1, (consumer2,))
这是我使用 kafka-console-producer 生成的一些消息的输出:
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')
而预期是其中之一。顺便说一句,这个主题k-test
有 2 个分区。