问题标签 [faust]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - FAUST:AttributeError:“NoneType”对象没有属性“topic_partition”
我在用 python faust 做一个简单的任务时遇到了一些麻烦,请看看这个问题,看看你是否能帮助我。
重现步骤
我使用了这段代码:
并得到标题上描述的错误
预期行为
填充了一张卡夫卡表
实际行为
版本
- Python 3.8 版
- Faust-streaming 版本 0.6.4
- 操作系统 windows 10、Ubuntu 20.04(通过 wsl)
- 卡夫卡版本 2.0.0 (msk)
- RocksDB 版本:5.0
难道我做错了什么?
faust - 浮士德应用程序在翻滚窗口中看到消息
我正在运行一个 faust 应用程序,它通过 Kafka 主题接收 protobuf 消息,并将它们插入到滚动窗口表中,其中用户 ID 作为键,值作为包含计数、用户 ID 和记录时间(以秒为单位)的浮士德记录。在随后的更新中,我将传入的计数对象与现有对象合并并更新表。从翻转窗口发出的过期记录被插入到表中。
这里的问题是我看到一些写入表中的记录在到期时没有发出。由于表是relative_to_field,所以我检查了faust记录中的日志时间,它是准时的。我只在生产中看到这个问题。在当地一切正常。
请注意:
- 有关该主题的传入消息采用 protobuf 格式。使用编解码器 (CountsProtoCodec),我将传入的 protobuf 转换为浮士德记录。
- 商店是rocksdb。
- 浮士德版本是 1.10.4
请在这里帮助我。下面是我使用的示例代码片段。
python - 浮士德找不到话题
我正在使用:python 3.8 faust==1.10.4 kafka==2.13-2.8.0
但是,faust 的日志只显示而不显示其他主题
我已经用 Kafka API 检查了主题“test_read”的存在。这是结果
因此,这个话题肯定存在,而浮士德没有发现它。你有什么建议吗?为什么会这样?
python - 更新变更日志时,浮士德如何增加 RocksDB 中的偏移量?
我只是好奇当我们使用 RocksDB 作为状态后端时,faust 是如何在内部更新变更日志的。
据我了解,我们将在表更新期间出现下一个行为:
将新的变更日志发送到带有回调的 kafka 变更日志主题_on_changelog_sent
(来自有关此回调的文档:这是在 RocksDB 中保留偏移量的原因,以便在启动时我们知道我们在数据库中已有数据的偏移量。)
但是最重要的一个问题:在kafka log中成功存储changelog消息后会不会调用这个回调?或者我们可以有这样的情况,当我们向 kafka 发送 changelog 消息,通过回调更新 Rocksdb 中的偏移量,但最后发送到 kafka 会失败?(因为一些kafka集群问题)
在这种情况下,rocksdb 中 changelog 主题的最后偏移量和 kafka 中的实际高水位偏移量不一致。
如果 Faust 现在尝试重新启动,那么由于这种不一致,我们将失败。
我问这个是因为当rocksdb中的偏移量大于kafka changelog主题中的最后一条消息偏移量时,我在生产中遇到了这样的问题。
我相信 kafka 生产者应该等待确认更改日志事件已保存到 kafka 日志,然后才运行回调以更新 RocksDB。
apache-kafka - 使用 Faust 批处理 Kafka 事件
我有一个 Kafka 主题,我们称之为摄取,它每秒接收一个条目x
。我有一个要在此数据上运行的进程,但它必须一次运行 100 个事件。因此,我想将这些条目批处理在一起并将它们发送到一个名为batched-ingest的新主题。这两个主题将如下所示...
ingest = [entry, entry, entry, ...]
batched-ingest = [[entry_0, entry_1, ..., entry_99]]
使用浮士德的正确方法是什么?我现在的解决方案是这个......
我不确定这是否是《浮士德》中这样做的正确方法。如果是这样,我应该设置什么within
以使其始终等到len(values) = 100
?
apache-kafka - 如何在 Faust 代理中运行 concurrent.futures.ProcessPoolExecutor?
成功的浮士德消费者和生产者(没有 CPU 与 concurrent.futures.ProcessPoolExecutor 绑定)
消费者
制片人
faust.readthedocs.io/en/latest/userguide/agents.html 中的示例效果很好。
但是,如果我试图将 CPU 绑定代码(来自 docs.python.org/3/library/asyncio-eventloop.html)添加到我的消费者,那么它看起来像:
使用 run_in_executor 的消费者
我收到了工人崩溃的警告:
它是否尝试在进程初始化期间使用相同的端口(正如我在 PyCharm 中设置的那样)再启动一个工作程序,或者我在 faust 消费者中的 cpu_bound 代码做错了什么?
python - 浮士德:TypeError:produce()得到了一个意外的关键字参数'timestamp'
在尝试重现浮士德文档和 Kafka 中显示的示例时,我得到了以下堆栈:
错误显示:self._quick_produce( TypeError: produce() got an unexpected keyword argument 'timestamp'
工人的代码用户是:
对于发件人,代码是:
我尝试使用其他传输编解码器,在记录中添加时间戳,但它不起作用
python - ModuleNotFoundError("'kafka' 不是一个有效的名称。你的意思是 aiokafka,kafka 之一吗?")
我正在使用 Celery 和 Kafka 运行一些作业,以便将数据推送到 Kafka。我还使用浮士德来连接工人。faust -A project.streams.app worker -l info
但不幸的是,为了运行管道,我在运行后遇到了错误。我想知道是否有人可以帮助我。
faust - 浮士德:当消息发送到不同分区时,看不到从 Tumbling Window 发出的消息
请注意当我们将消息发送到单个分区时。我没有看到任何问题。两条消息都可以正常发出。只有当消息被发送到不同的分区时,我才会看到这个问题。
为了解释这一点,我粘贴了下面我的应用程序的日志和下面的代码片段。
从日志中,每 5 秒将分别具有键 location_11 和 location_10 的两种类型的消息发送到主题到预定义的分区(0 和 1)。这些消息正在被有趣的 print_windowed_events 使用,并且还在更新表。从日志中可以清楚地看出,使用这些消息时没有丢包。但是关闭时回调函数仅记录每个窗口的一个键。
请指教
python - 更改主题时如何为浮士德设计升级路径=
在浮士德文档中,它指出当更改主题的键/值模型时,它是不兼容的更改。它声明所有浮士德实例都必须重新启动。
如何在不停机的情况下更改键/值?由另一个库(例如生成的 Protobuf 代码)执行反序列化是否更有意义?当以向后兼容的方式更新生成的 Protobuf 代码时,以下代码是否会规避停机时间要求(proto/greetings_pb2.py
生成的代码)?