0

我正在尝试在单个函数中运行 Producer 和 Consumer。

流程如下:

消息到达 --> 从主题 1 消费 --> 转换 --> 生产到主题 2

如果我以这种模式实例化生产者和消费者(先是生产者,然后是消费者),它可以正常工作。

def __init__(self, producer_config, consumer_config)
  self.producer = Producer(**producer_config)

  self.consumer_config = consumer_config
  self.consumer = Consumer(**consumer_config)
  self.consumer.subscribe([topic1])

但是如果我颠倒顺序并先实例化消费者,然后实例化生产者,

def __init__(self, producer_config, consumer_config)    
  self.consumer_config = consumer_config
  self.consumer = Consumer(**consumer_config)
  self.consumer.subscribe([topic1])

  self.producer = Producer(**producer_config)

我得到错误:

{"level": "ERROR", "message": "Kafka 消费者错误:KafkaError{code=GROUP_AUTHORIZATION_FAILED,val=30,str="JoinGroup failed: Broker: Group authorization failed"}"}

我只是想知道这是否按预期和可能的原因起作用。

4

0 回答 0