0

我正在建立一个死信队列。我感兴趣的实现是将消息全部转发到另一个队列的能力(例如,当出现错误时)。换句话说,我对保留消息的元数据及其内容很感兴趣。

这里有一些代码来突出这个问题:

from confluent_kafka import Producer
producer = Producer({'bootstrap.servers': "host1:9092",'client.id': '0', 'auto.offset.reset': 'smallest'})
while True:
  msg = consumer.poll(timeout=1)
  if msg is None:
    continue
  else:
    producer.produce(topic='test_topic', value=msg )

但是,我无法发送此消息。生成消息时,将消息转换为 json 会返回以下内容: TypeError: Object of type Message is not JSON serializable. 如果我尝试序列化为 JSON - 那么我似乎无法使用 json 获得消息的完整表示。

我正在使用融合的 kafka pytrhon。这是消费者 - https://github.com/confluentinc/confluent-kafka-python/blob/a5663da7ea76e58d02b13e4e6703ea6a9c52ec11/src/confluent_kafka/src/Consumer.c。生产者 - https://github.com/confluentinc/confluent-kafka-python/blob/a5663da7ea76e58d02b13e4e6703ea6a9c52ec11/src/confluent_kafka/src/Producer.c

在包含消息元数据的同时,我如何能够将消息从一个队列转发到另一个队列?

4

0 回答 0