问题标签 [faust]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
234 浏览

python - FAUST:AttributeError:“NoneType”对象没有属性“topic_partition”

我在用 python faust 做一个简单的任务时遇到了一些麻烦,请看看这个问题,看看你是否能帮助我。

重现步骤

我使用了这段代码:

并得到标题上描述的错误

预期行为

填充了一张卡夫卡表

实际行为

版本

  • Python 3.8 版
  • Faust-streaming 版本 0.6.4
  • 操作系统 windows 10、Ubuntu 20.04(通过 wsl)
  • 卡夫卡版本 2.0.0 (msk)
  • RocksDB 版本:5.0

难道我做错了什么?

0 投票
0 回答
64 浏览

faust - 浮士德应用程序在翻滚窗口中看到消息

我正在运行一个 faust 应用程序,它通过 Kafka 主题接收 protobuf 消息,并将它们插入到滚动窗口表中,其中用户 ID 作为键,值作为包含计数、用户 ID 和记录时间(以秒为单位)的浮士德记录。在随后的更新中,我将传入的计数对象与现有对象合并并更新表。从翻转窗口发出的过期记录被插入到表中。

这里的问题是我看到一些写入表中的记录在到期时没有发出。由于表是relative_to_field,所以我检查了faust记录中的日志时间,它是准时的。我只在生产中看到这个问题。在当地一切正常。

请注意:

  1. 有关该主题的传入消息采用 protobuf 格式。使用编解码器 (CountsProtoCodec),我将传入的 protobuf 转换为浮士德记录。
  2. 商店是rocksdb。
  3. 浮士德版本是 1.10.4

请在这里帮助我。下面是我使用的示例代码片段。

0 投票
0 回答
104 浏览

python - 浮士德找不到话题

我正在使用:python 3.8 faust==1.10.4 kafka==2.13-2.8.0

但是,faust 的日志只显示而不显示其他主题

我已经用 Kafka API 检查了主题“test_read”的存在。这是结果

因此,这个话题肯定存在,而浮士德没有发现它。你有什么建议吗?为什么会这样?

0 投票
0 回答
93 浏览

python - 更新变更日志时,浮士德如何增加 RocksDB 中的偏移量?

我只是好奇当我们使用 RocksDB 作为状态后端时,faust 是如何在内部更新变更日志的。

据我了解,我们将在表更新期间出现下一个行为:

将新的变更日志发送到带有回调的 kafka 变更日志主题_on_changelog_sent(来自有关此回调的文档:这是在 RocksDB 中保留偏移量的原因,以便在启动时我们知道我们在数据库中已有数据的偏移量。

但是最重​​要的一个问题:在kafka log中成功存储changelog消息后会不会调用这个回调?或者我们可以有这样的情况,当我们向 kafka 发送 changelog 消息,通过回调更新 Rocksdb 中的偏移量,但最后发送到 kafka 会失败?(因为一些kafka集群问题)

在这种情况下,rocksdb 中 changelog 主题的最后偏移量和 kafka 中的实际高水位偏移量不一致。

如果 Faust 现在尝试重新启动,那么由于这种不一致,我们将失败。

我问这个是因为当rocksdb中的偏移量大于kafka changelog主题中的最后一条消息偏移量时,我在生产中遇到了这样的问题。

我相信 kafka 生产者应该等待确认更改日志事件已保存到 kafka 日志,然后才运行回调以更新 RocksDB。

0 投票
1 回答
200 浏览

apache-kafka - 使用 Faust 批处理 Kafka 事件

我有一个 Kafka 主题,我们称之为摄取,它每秒接收一个条目x。我有一个要在此数据上运行的进程,但它必须一次运行 100 个事件。因此,我想将这些条目批处理在一起并将它们发送到一个名为batched-ingest的新主题。这两个主题将如下所示...

ingest = [entry, entry, entry, ...]

batched-ingest = [[entry_0, entry_1, ..., entry_99]]

使用浮士德的正确方法是什么?我现在的解决方案是这个......

我不确定这是否是《浮士德》中这样做的正确方法。如果是这样,我应该设置什么within以使其始终等到len(values) = 100

0 投票
0 回答
107 浏览

apache-kafka - 如何在 Faust 代理中运行 concurrent.futures.ProcessPoolExecutor?

成功的浮士德消费者和生产者(没有 CPU 与 concurrent.futures.ProcessPoolExecutor 绑定)

消费者

制片人

faust.readthedocs.io/en/latest/userguide/agents.html 中的示例效果很好。

但是,如果我试图将 CPU 绑定代码(来自 docs.python.org/3/library/asyncio-eventloop.html)添加到我的消费者,那么它看起来像:

使用 run_in_executor 的消费者

我收到了工人崩溃的警告:

它是否尝试在进程初始化期间使用相同的端口(正如我在 PyCharm 中设置的那样)再启动一个工作程序,或者我在 faust 消费者中的 cpu_bound 代码做错了什么?

0 投票
0 回答
118 浏览

python - 浮士德:TypeError:produce()得到了一个意外的关键字参数'timestamp'

在尝试重现浮士德文档和 Kafka 中显示的示例时,我得到了以下堆栈:

错误显示:self._quick_produce( TypeError: produce() got an unexpected keyword argument 'timestamp'

工人的代码用户是:

对于发件人,代码是:

我尝试使用其他传输编解码器,在记录中添加时间戳,但它不起作用

0 投票
1 回答
116 浏览

python - ModuleNotFoundError("'kafka' 不是一个有效的名称。你的意思是 aiokafka,kafka 之一吗?")

我正在使用 Celery 和 Kafka 运行一些作业,以便将数据推送到 Kafka。我还使用浮士德来连接工人。faust -A project.streams.app worker -l info但不幸的是,为了运行管道,我在运行后遇到了错误。我想知道是否有人可以帮助我。

0 投票
0 回答
70 浏览

faust - 浮士德:当消息发送到不同分区时,看不到从 Tumbling Window 发出的消息

请注意当我们将消息发送到单个分区时。我没有看到任何问题。两条消息都可以正常发出。只有当消息被发送到不同的分区时,我才会看到这个问题。
为了解释这一点,我粘贴了下面我的应用程序的日志和下面的代码片段。

从日志中,每 5 秒将分别具有键 location_11 和 location_10 的两种类型的消息发送到主题到预定义的分区(0 和 1)。这些消息正在被有趣的 print_windowed_events 使用,并且还在更新表。从日志中可以清楚地看出,使用这些消息时没有丢包。但是关闭时回调函数仅记录每个窗口的一个键。

请指教

0 投票
0 回答
19 浏览

python - 更改主题时如何为浮士德设计升级路径=

浮士德文档中,它指出当更改主题的键/值模型时,它是不兼容的更改。它声明所有浮士德实例都必须重新启动。

如何在不停机的情况下更改键/值?由另一个库(例如生成的 Protobuf 代码)执行反序列化是否更有意义?当以向后兼容的方式更新生成的 Protobuf 代码时,以下代码是否会规避停机时间要求(proto/greetings_pb2.py生成的代码)?