我们最近从 faust 1.10.4 切换到 faust-streaming(0.6.9)。发布此消息后,我们看到应用程序崩溃并出现以下异常。该应用程序具有多个层,在每个阶段都对数据进行聚合和过滤。在每个阶段,处理器将消息发送到 Kafka 主题,相应的 faust 应用程序代理会使用该消息。但是对于 Kafka 主题,我们在每一层都保持分区计数相同。
- 集群大小 = 12
- 主题和表分区计数 = 36
- 浮士德流媒体版本 = 0.6.9
- kafka-python 版本 = 2.0.2
[2021-07-29 10:05:23,761] [18808] [ERROR] [^---Fetcher]: Crashed reason=AssertionError(‘Partition is not assigned’)
Traceback (most recent call last):
File “/usr/local/lib/python3.8/site-packages/mode/services.py”, line 802, in _execute_task
await task
File “/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py”, line 176, in _fetcher
await consumer._drain_messages(self)
File “/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py”, line 1104, in _drain_messages
async for tp, message in ait:
File “/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py”, line 714, in getmany
highwater_mark = self.highwater(tp)
File “/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py”, line 1367, in highwater
return self._thread.highwater(tp)
File “/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py”, line 923, in highwater
return self._ensure_consumer().highwater(tp)
File “/usr/local/lib/python3.8/site-packages/aiokafka/consumer/consumer.py”, line 673, in highwater
assert self._subscription.is_assigned(partition), \
AssertionError: Partition is not assigned
[2021-07-29 10:05:23,764] [18808] [INFO] [^Worker]: Stopping...
[2021-07-29 10:05:23,765] [18808] [INFO] [^-App]: Stopping...
请在这里帮助我们。