1

有什么方法可以优先处理高优先级的消息吗?

我尝试创建三个主题“高”、“中”和“低”,并使用一个消费者订阅所有三个主题,如果“高”主题中有未处理的消息,它将暂停另外两个。有没有更好的方法来实现消息优先级?

我尝试使用下面给出的逻辑。

topics = ['high', 'medium', 'low']
consumer.subscribe(topics)
high_topic_partition = TopicPartition(priority['high'], 0)
medium_topic_partition = TopicPartition(priority['medium'], 0)
low_topic_partition = TopicPartition(priority['low'], 0)

while True:

    messages = consumer.poll() 
    high_priotity_unprocessed_msg = consumer.end_offsets([high_topic_partition])[high_topic_partition] - consumer.position(high_topic_partition)
    medium_priotity_unprocessed_msg = consumer.end_offsets([medium_topic_partition])[medium_topic_partition] - consumer.position(medium_topic_partition)
    low_priotity_unprocessed_msg = consumer.end_offsets([low_topic_partition])[low_topic_partition] - consumer.position(low_topic_partition)

    if high_priotity_unprocessed_msg >0:  
     consumer.pause(medium_topic_partition)
            consumer.pause(low_topic_partition)

        else:
            consumer.resume(medium_topic_partition)

            if medium_priotity_unprocessed_msg >0:
                consumer.pause(low_topic_partition)
            else:
                consumer.resume(low_topic_partition)
        if messages:
            process(messages)
4

1 回答 1

2

您可以评估的一种选择基本上只是在更高优先级的消息上具有更多的并行性......

例如:

Topic1 (Priority Low):    1 partitions
Topic2 (Priority medium): 5 partitions
Topic3 (Priority High):  20 partitions

然后基本上有:

  • 1个消费者从topic1获取数据
  • 来自topic2的 5 个消费者
  • 来自topic3的 20 个消费者

现在,我建议您执行此操作的最简单方法基本上是编写一次代码...但是将“主题名称”的配置外部化...然后将其放大(当然使用容器)...请参考这篇文章:

例如,代码可以很简单:

SuperAwesomeAppBinaryCode:

topic = %MY_TOPIC_NAME_INJECTED_BY_ENV_VAR%
consumer.subscribe(topic)

while True:

    messages = consumer.poll() 
    if messages:
        process(messages)

现在,如果我们在 K8s 上部署了该代码,您可以有 3 个不同的部署,运行相同的代码,但为每种情况注入正确的主题,例如:

低优先级消息

apiVersion: apps/v1
kind: Deployment
metadata:
  name: LowPriorityProcessor
  labels:
    app: LowPriorityProcessor
spec:
  replicas: 1
  selector:
    matchLabels:
      app: LowPriorityProcessor
  template:
    metadata:
      labels:
        app: LowPriorityProcessor
    spec:
      containers:
      - name: LowPriorityProcessor
        image: SuperAwesomeAppBinaryCode:1.0.0
        env:
        - name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
          value: topic1
        ports:
        - containerPort: 80

中等优先级消息

apiVersion: apps/v1
kind: Deployment
metadata:
  name: MediumPriorityProcessor
  labels:
    app: MediumPriorityProcessor
spec:
  replicas: 5
  selector:
    matchLabels:
      app: MediumPriorityProcessor
  template:
    metadata:
      labels:
        app: MediumPriorityProcessor
    spec:
      containers:
      - name: MediumPriorityProcessor
        image: SuperAwesomeAppBinaryCode:1.0.0
        env:
        - name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
          value: topic2
        ports:
        - containerPort: 80

高优先级消息

apiVersion: apps/v1
kind: Deployment
metadata:
  name: HighPriorityProcessor
  labels:
    app: HighPriorityProcessor
spec:
  replicas: 20
  selector:
    matchLabels:
      app: HighPriorityProcessor
  template:
    metadata:
      labels:
        app: HighPriorityProcessor
    spec:
      containers:
      - name: HighPriorityProcessor
        image: SuperAwesomeAppBinaryCode:1.0.0
        env:
        - name: MY_TOPIC_NAME_INJECTED_BY_ENV_VAR
          value: topic3
        ports:
        - containerPort: 80

然后让并行性发挥它的魔力 如果你仔细检查,从一个“k8s 部署”到另一个的唯一变化就是主题和副本的数量。

笔记:

  • 你可以在没有 K8s 的情况下实现这一点......使用 Docker Swarm 甚至只是 docker -compose或手动运行实例 ‍♂️,但你为什么要重新发明轮子,但可以肯定的是,在某些边缘情况下,没有太多选择...
  • 可以在此处找到有关此主题的精彩阅读
于 2021-02-02T16:01:48.300 回答