0

我在用 python faust 做一个简单的任务时遇到了一些麻烦,请看看这个问题,看看你是否能帮助我。

重现步骤

我使用了这段代码:

import faust

from settings import KAFKA_SERVER

app = faust.App('streams', broker=KAFKA_SERVER, producer_acks=0, store='rocksdb://')


class ProjetoRateio(faust.Record):
    codigoProjeto: str
    combinacao: str
    grade: str
    quantidade: int


projeto_rateio_topic = app.topic(
    'gua-poc-sent-rateio',
    # key_type=str,
    value_type=ProjetoRateio,
    # value_serializer='raw',
)

grade_total = app.Table('grade_total', default=int,
                        partitions=1)


@app.agent(projeto_rateio_topic)
async def projeto_rateio(rateios):
    async for rateio in rateios:
        # grade_total[f'{rateio.codigoProjeto}.{rateio.combinacao}.{rateio.grade}'] += rateio.quantidade
        grade_total[rateio.codigoProjeto] += rateio.quantidade

并得到标题上描述的错误

预期行为

填充了一张卡夫卡表

实际行为

Exception in callback Topic._on_published(message=<FutureMessag...d result=None>, state={<Monitor: running >: 7442.2543931}, producer=<Producer: running >)(<Future finished result=None>)
handle: <Handle Topic._on_published(message=<FutureMessag...d result=None>, state={<Monitor: running >: 7442.2543931}, producer=<Producer: running >)(<Future finished result=None>)>
Traceback (most recent call last):
  File "/usr/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/home/jhon/.cache/pypoetry/virtualenvs/gua-kafka-stream-faust-poc-VnS5Y2j1-py3.8/lib/python3.8/site-packages/faust/topics.py", line 474, in _on_published
    message.message.callback(message)
  File "/home/jhon/.cache/pypoetry/virtualenvs/gua-kafka-stream-faust-poc-VnS5Y2j1-py3.8/lib/python3.8/site-packages/faust/tables/base.py", line 353, in _on_changelog_sent
    self.data.set_persisted_offset(res.topic_partition, res.offset)
AttributeError: 'NoneType' object has no attribute 'topic_partition'

版本

  • Python 3.8 版
  • Faust-streaming 版本 0.6.4
  • 操作系统 windows 10、Ubuntu 20.04(通过 wsl)
  • 卡夫卡版本 2.0.0 (msk)
  • RocksDB 版本:5.0

难道我做错了什么?

4

1 回答 1

0

遇到类似问题,有问题的配置是producer_acks=0,尝试将其设置为1-1基于您想要的行为:

生产者要求领导者在考虑完成请求之前收到的确认数量。这控制了发送的记录的持久性。以下设置是常见的:

  • 0:生产者根本不会等待服务器的任何确认。该消息将立即被视为已发送(不推荐)。
  • 1:代理领导者将记录写入其本地日志,但将响应而不等待所有追随者的完全确认。在这种情况下,如果领导者在确认记录后但在追随者复制它之前立即失败,那么记录将丢失。
  • -1:broker 领导者将等待完整的同步副本集来确认记录。这保证了只要至少一个同步副本保持活动状态,记录就不会丢失。这是最有力的保证。

这个块取自 faust-streaming github here

于 2021-12-10T19:19:03.373 回答