1

我想通过分布式微服务架构实现 Kafka 进行消息传递。

我正在使用PyKafka并实现了虚拟生产者和(平衡)消费者。我将所有消费者分配到同一个消费者组我同时使用来自Python 和控制台的生产者没有问题,甚至在运行时添加它们。

但是,我对消费者有疑问。我可以创建多个 Python 消费者,甚至可以在运行时添加它们。但是,当我将控制台使用者(kafka-console-consumer)添加到具有 Python 使用者的组时,我收到互斥错误:

从消费者 ID 'b'Michals-MacBook-Pro.local:1722eea0-07d3-4be4-9d97-8b7fb15b0b30'' 提交主题 'b'michal_sample_topic'' 的偏移时出错(错误:{'pykafka.exceptions.UnknownMemberId': [0 , 1]})

此外,这两者(即使它们属于同一个消费者组)都在消费消息(Python消费者在他们自己之间平衡它并在他们之间控制台消费者)

现在,我是 Kafka 的新手,但我的第一印象是 Kafka 应该与消费者的实现无关,因此应该可以将它们结合起来。我的理解是 PyKafka 还是我对 PyKafka 的实施有问题?

制片人:

from pykafka import KafkaClient
from time import sleep

client = KafkaClient(hosts="localhost:9092")

print(client.brokers)
print(client.topics)

topic = client.topics[b'michal_sample_topic']

with topic.get_sync_producer() as producer:

while True:
    producer.produce(
        bytes(
            input('Send test message:'), 
            'utf-8'
        )
    )

消费者:

from pykafka import KafkaClient

client = KafkaClient(hosts="localhost:9092")

print(client.brokers)
print(client.topics)

topic = client.topics[b'michal_sample_topic']

balanced_consumer = topic.get_balanced_consumer(
    consumer_group=b'testing',
    auto_commit_enable=True,
    zookeeper_connect='localhost:2181'
)

for message in balanced_consumer:
    if message is not None:
        print(f'{message.offset} {message.value}')
4

0 回答 0