有什么方法可以优先处理高优先级的消息吗?
我尝试创建三个主题“高”、“中”和“低”,并使用一个消费者订阅所有三个主题,如果“高”主题中有未处理的消息,它将暂停另外两个。有没有更好的方法来实现消息优先级?
我尝试使用下面给出的逻辑。
topics = ['high', 'medium', 'low']
consumer.subscribe(topics)
high_topic_partition = TopicPartition(priority['high'], 0)
medium_topic_partition = TopicPartition(priority['medium'], 0)
low_topic_partition = TopicPartition(priority['low'], 0)
while True:
messages = consumer.poll()
high_priotity_unprocessed_msg = consumer.end_offsets([high_topic_partition])[high_topic_partition] - consumer.position(high_topic_partition)
medium_priotity_unprocessed_msg = consumer.end_offsets([medium_topic_partition])[medium_topic_partition] - consumer.position(medium_topic_partition)
low_priotity_unprocessed_msg = consumer.end_offsets([low_topic_partition])[low_topic_partition] - consumer.position(low_topic_partition)
if high_priotity_unprocessed_msg >0:
consumer.pause(medium_topic_partition)
consumer.pause(low_topic_partition)
else:
consumer.resume(medium_topic_partition)
if medium_priotity_unprocessed_msg >0:
consumer.pause(low_topic_partition)
else:
consumer.resume(low_topic_partition)
if messages:
process(messages)