1

这可能是一个 eisenbug,所以我不期待硬性答案,而是更多关于寻找什么能够复制该错误的提示。

我有一个由多个服务组成的事件驱动、基于 kafka 的系统。目前,它们被组织成线性管道。一个主题,一种事件类型。每个服务都可以被认为是从一种事件类型到一种或多种事件类型的转换。

每个转换都作为一个 python 进程执行,有自己的消费者和自己的生产者。它们都共享相同的代码和配置,因为这都是从服务实现中抽象出来的。

现在,有什么问题。在我们的暂存环境中,有时(假设每 50 条消息中有一条)Kafka 上有一条消息,但消费者根本没有处理它。即使您等待数小时,它也会挂起。这不会发生在本地环境中,我们无法在其他任何地方重现它。

一些更相关的信息:

  • 这些服务经常出于调试目的而重新启动,但挂起似乎与重新启动无关。
  • 当消息挂起并且您重新启动服务时,该服务将处理该消息。
  • 这些服务是完全无状态的,所以没有缓存或其他奇怪的事情发生(我希望)
  • 发生这种情况时,我有证据表明他们仍在处理旧消息(我在他们产生输出时记录,这发生在消费者循环结束之前)
  • 在当前的部署中,消费者组中只有一个消费者,因此在相同的服务中没有并行处理,也没有服务的水平扩展

我如何消费:

我使用 pykafka,这是消费者循环:

def create_consumer(self):

    consumer = self.client.topics[bytes(self.input_topic, "UTF-8")].get_simple_consumer(
        consumer_group=bytes(self.consumer_group, "UTF-8"),
        auto_commit_enable=True,
        offsets_commit_max_retries=self.service_config.kafka_offsets_commit_max_retries,
    )
    return consumer

def run(self):

    consumer = self.create_consumer()
    while not self.stop_event.wait(1):
        message = consumer.consume()
        results = self._process_message(message)
        self.output_results(results)

我的假设是我使用消息的方式存在一些问题,或者消费者组偏移量存在一些不一致的状态,但我无法真正解决这个问题。

我也在考虑搬到浮士德来解决这个问题。鉴于我的代码库和架构决定,过渡应该不会太难,但在开始这样的工作之前,我想确定我正在朝着正确的方向前进。现在这只是一个盲目的尝试,希望造成问题的一些细节会消失。

4

0 回答 0