1

我们最近从 faust 1.10.4 切换到 faust-streaming(0.6.9)。发布此消息后,我们看到应用程序崩溃并出现以下异常。该应用程序具有多个层,在每个阶段都对数据进行聚合和过滤。在每个阶段,处理器将消息发送到 Kafka 主题,相应的 faust 应用程序代理会使用该消息。但是对于 Kafka 主题,我们在每一层都保持分区计数相同。

  • 集群大小 = 12
  • 主题和表分区计数 = 36
  • 浮士德流媒体版本 = 0.6.9
  • kafka-python 版本 = 2.0.2
[2021-07-29 10:05:23,761] [18808] [ERROR] [^---Fetcher]: Crashed reason=AssertionError(‘Partition is not assigned’)
Traceback (most recent call last):
  File “/usr/local/lib/python3.8/site-packages/mode/services.py”, line 802, in _execute_task
    await task
  File “/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py”, line 176, in _fetcher
    await consumer._drain_messages(self)
  File “/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py”, line 1104, in _drain_messages
    async for tp, message in ait:
  File “/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py”, line 714, in getmany
    highwater_mark = self.highwater(tp)
  File “/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py”, line 1367, in highwater
    return self._thread.highwater(tp)
  File “/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py”, line 923, in highwater
    return self._ensure_consumer().highwater(tp)
  File “/usr/local/lib/python3.8/site-packages/aiokafka/consumer/consumer.py”, line 673, in highwater
    assert self._subscription.is_assigned(partition), \
AssertionError: Partition is not assigned
[2021-07-29 10:05:23,764] [18808] [INFO] [^Worker]: Stopping...
[2021-07-29 10:05:23,765] [18808] [INFO] [^-App]: Stopping...

请在这里帮助我们。

4

0 回答 0