我正在尝试在单个函数中运行 Producer 和 Consumer。
流程如下:
消息到达 --> 从主题 1 消费 --> 转换 --> 生产到主题 2
如果我以这种模式实例化生产者和消费者(先是生产者,然后是消费者),它可以正常工作。
def __init__(self, producer_config, consumer_config)
self.producer = Producer(**producer_config)
self.consumer_config = consumer_config
self.consumer = Consumer(**consumer_config)
self.consumer.subscribe([topic1])
但是如果我颠倒顺序并先实例化消费者,然后实例化生产者,
def __init__(self, producer_config, consumer_config)
self.consumer_config = consumer_config
self.consumer = Consumer(**consumer_config)
self.consumer.subscribe([topic1])
self.producer = Producer(**producer_config)
我得到错误:
{"level": "ERROR", "message": "Kafka 消费者错误:KafkaError{code=GROUP_AUTHORIZATION_FAILED,val=30,str="JoinGroup failed: Broker: Group authorization failed"}"}
我只是想知道这是否按预期和可能的原因起作用。