我正在建立一个死信队列。我感兴趣的实现是将消息全部转发到另一个队列的能力(例如,当出现错误时)。换句话说,我对保留消息的元数据及其内容很感兴趣。
这里有一些代码来突出这个问题:
from confluent_kafka import Producer
producer = Producer({'bootstrap.servers': "host1:9092",'client.id': '0', 'auto.offset.reset': 'smallest'})
while True:
msg = consumer.poll(timeout=1)
if msg is None:
continue
else:
producer.produce(topic='test_topic', value=msg )
但是,我无法发送此消息。生成消息时,将消息转换为 json 会返回以下内容:
TypeError: Object of type Message is not JSON serializable
. 如果我尝试序列化为 JSON - 那么我似乎无法使用 json 获得消息的完整表示。
我正在使用融合的 kafka pytrhon。这是消费者 - https://github.com/confluentinc/confluent-kafka-python/blob/a5663da7ea76e58d02b13e4e6703ea6a9c52ec11/src/confluent_kafka/src/Consumer.c。生产者 - https://github.com/confluentinc/confluent-kafka-python/blob/a5663da7ea76e58d02b13e4e6703ea6a9c52ec11/src/confluent_kafka/src/Producer.c
在包含消息元数据的同时,我如何能够将消息从一个队列转发到另一个队列?