问题标签 [faust]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
pycharm - PyCharm 中 Faust worker 的调试模式
我正在尝试在调试模式下运行 faust worker ( https://faust.readthedocs.io )。此刻我这样做了:
如您所见,这不是实际的调试。我期望这个:
你能说我做错了什么吗?
python - 使用 Python Faust 转发事件
我正在尝试将消息转发到浮士德的内部主题。正如本例中 faust 所建议的:
https://faust.readthedocs.io/en/latest/playbooks/vskafka.html
我有以下代码
但是我在运行我的 pytest 时总是收到以下错误:
是否删除了此功能,或者我是否需要以不同的方式指定主题以获取事件,或者这甚至是 pytest 的问题?
python - 以编程方式构建代理时,无法使用 webview 向浮士德代理/主题发送消息
我正在尝试以编程方式构建代理,并且一直在关注这个示例。当我查看在faust -A <> agents
脚本开始时生成的主题和代理时,我可以看到它们。代理名称类似于{topic}_agent
示例。所有这些代理分别接触到一些 rest api。这是代码。
但是,当我发布一些东西时,我收到了一个错误 - NameError: name 'gdelt_agent' is not defined
。这里的任何帮助将不胜感激。
python - 滚动窗口“过期”参数小于“大小”不起作用
重现步骤
我试图在翻滚窗口中聚合一些数据,然后将处理函数应用于窗口中的数据。我正在使用expires 参数来处理迟到的事件 (假设我们可以在n+1分钟的前 10 秒内获得属于n分钟的事件)。
预期行为
我希望仅当传递n+1窗口的 10 秒以处理延迟事件时才调用n窗口的 process_window_function
实际行为
如果 Table 的expires参数小于size参数,则窗口n的 process_window_function将在窗口n+1的第一个事件之后立即调用。看起来浮士德只是忽略了expires。对于这种行为 ,可能会稍晚到达的迟到的事件将被跳过。
如果 expires 参数等于或大于大小,迟到的事件将得到正确处理,但我不希望延迟超过 10 秒。
卡夫卡输入
日志
版本
- 蟒蛇版本
3.7.9
- 浮士德版本
faust-streaming==0.6.1
- RocksDB 版本
python-rocksdb
我有可能在 Flink 中实现这种行为,但在 Faust 中遇到了这个问题。我究竟做错了什么?
python - 理解用浮士德表和分区连接浮士德主题
我有两个主题:
- 1 个带有事件数据
EventData
的主题(假设5 个分区)——该主题的日志使用 CustomerID 作为键。 - 1 个带有丰富数据的紧凑主题(
EnrichmentKVs
假设3 个分区)——该主题的日志使用相同的 CustomerID 作为键。
目标是将 EnrichmentKV 保存在 Faust 表中,当 EventData 日志流入时,它们会使用该表中的数据进行丰富并发布到新的流/主题。
所以我有两个Faust (python) 应用程序,每个应用程序都有自己运行的实例数量:
- App1(N-instances running)使用 key=CustomerId 发布到 EventData 主题
- App2(正在运行的 M 实例)执行以下操作:
- 更新浮士德表 (
EnrichmentKVsTable
) 以获取来自 EnrichmentKVs 主题的值 - 从 EventData 主题流入,并将浮士德表中的数据与来自的数据流“连接”
Eventdata
- 更新浮士德表 (
我的理解是,App2 的每个实例都只会有一个基于分区键的部分 EnrichmentKV 表。要使“JOIN”工作,任何日志EventData(key="1234")
都必须与日志进入相同的App2实例EnrichmentKVsTable(key="1234")
当两个输入主题的分区不同,并且每个应用程序的实例数量也可能不同时,浮士德如何确保这一点?还是我处理这个问题是错误的?
python-3.x - 使用 relative_to_field 时 Python Faust Tumbling Window 不起作用
当翻滚窗口与 relative_to_field 一起使用时,它没有按预期工作。针对键本身更新的值不会反映在同一窗口中。它总是返回默认值而不是更新后的值。当我不使用“relative_to_field”时,它按预期工作。
预期行为
下面的打印语句,我们在更新后从翻滚窗口打印最新值应该返回在上一步中更新的值“更新后的值::”+ str(tumbling_table[key].value())“
实际行为
下面的打印语句,我们在更新后从翻滚窗口打印最新值给出默认值而不是更新值“更新后的值::”+ str(tumbling_table[key].value())“
版本
- Python版本:3.8.8
- 浮士德版本:v1.10.4
- 操作系统:OSX Big Sur 11.2.3
- 卡夫卡版本:kafka_2.12-2.3.0
- RocksDB 版本:0.7.0
我不确定为什么在使用 relative_to_field 选项和翻转窗口时这个简单的代码不起作用。
python - 我如何运行浮士德工人并管理他们
我想从我的代码中运行浮士德工作者(从信号中运行)。我需要管理工人(暂停,杀死,重新加载)。将来,我想管理 docker 容器中的工人。也就是说,一个容器中有 3 个工人,另一个容器中有 2 个工人……或者也许有更好的方法来运行和管理工人?我已经学会了如何在容器中运行它们,每个容器手动运行一个。
我需要一个用于大量工人的守护进程,例如主管
python - 如何指定名称以数字开头的python faust.Record字段?
我通过扩展 faust.Record 类来描述浮士德模型。我的输入 json 看起来像:
所以我必须创建类字段,只用数字命名。我知道,它通常在 Python 中受到限制,但是有没有解决方法,比如我的 fieds 名称的别名?
我在 Java 中寻找类似 Jackson 注释的东西:
python - 如何捕捉 faust python json 序列化错误?
我有简单的浮士德代理。它使用来自 kafka 主题的 json,并默认将它们解析为 dicts faust 序列化器:
反序列化本身发生在我的代码之外的某个地方,它由 faust 框架管理,而不是由我管理。如何捕获和处理反序列化异常,例如在无效 json 的情况下?
python - Faust.web API 服务器挂起异常
我是 Python 和 Faust 的新手。我们正在使用 Faust.web 开发我们的 API。一切似乎都在工作,除了例外。
问题是,当我们运行 API 服务器并从 Postman 访问导致代码异常的 API 时,我们正确地得到 500 状态代码。但是当我们下次访问相同的 API 时,Postman 会等待响应,直到 API 超时。
请帮忙!