我在用 python faust 做一个简单的任务时遇到了一些麻烦,请看看这个问题,看看你是否能帮助我。
重现步骤
我使用了这段代码:
import faust
from settings import KAFKA_SERVER
app = faust.App('streams', broker=KAFKA_SERVER, producer_acks=0, store='rocksdb://')
class ProjetoRateio(faust.Record):
codigoProjeto: str
combinacao: str
grade: str
quantidade: int
projeto_rateio_topic = app.topic(
'gua-poc-sent-rateio',
# key_type=str,
value_type=ProjetoRateio,
# value_serializer='raw',
)
grade_total = app.Table('grade_total', default=int,
partitions=1)
@app.agent(projeto_rateio_topic)
async def projeto_rateio(rateios):
async for rateio in rateios:
# grade_total[f'{rateio.codigoProjeto}.{rateio.combinacao}.{rateio.grade}'] += rateio.quantidade
grade_total[rateio.codigoProjeto] += rateio.quantidade
并得到标题上描述的错误
预期行为
填充了一张卡夫卡表
实际行为
Exception in callback Topic._on_published(message=<FutureMessag...d result=None>, state={<Monitor: running >: 7442.2543931}, producer=<Producer: running >)(<Future finished result=None>)
handle: <Handle Topic._on_published(message=<FutureMessag...d result=None>, state={<Monitor: running >: 7442.2543931}, producer=<Producer: running >)(<Future finished result=None>)>
Traceback (most recent call last):
File "/usr/lib/python3.8/asyncio/events.py", line 81, in _run
self._context.run(self._callback, *self._args)
File "/home/jhon/.cache/pypoetry/virtualenvs/gua-kafka-stream-faust-poc-VnS5Y2j1-py3.8/lib/python3.8/site-packages/faust/topics.py", line 474, in _on_published
message.message.callback(message)
File "/home/jhon/.cache/pypoetry/virtualenvs/gua-kafka-stream-faust-poc-VnS5Y2j1-py3.8/lib/python3.8/site-packages/faust/tables/base.py", line 353, in _on_changelog_sent
self.data.set_persisted_offset(res.topic_partition, res.offset)
AttributeError: 'NoneType' object has no attribute 'topic_partition'
版本
- Python 3.8 版
- Faust-streaming 版本 0.6.4
- 操作系统 windows 10、Ubuntu 20.04(通过 wsl)
- 卡夫卡版本 2.0.0 (msk)
- RocksDB 版本:5.0
难道我做错了什么?