问题标签 [faust]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
246 浏览

pycharm - PyCharm 中 Faust worker 的调试模式

我正在尝试在调试模式下运行 faust worker ( https://faust.readthedocs.io )。此刻我这样做了:

在此处输入图像描述

如您所见,这不是实际的调试。我期望这个:

在此处输入图像描述

你能说我做错了什么吗?

0 投票
3 回答
365 浏览

python - 使用 Python Faust 转发事件

我正在尝试将消息转发到浮士德的内部主题。正如本例中 faust 所建议的:

https://faust.readthedocs.io/en/latest/playbooks/vskafka.html

我有以下代码

但是我在运行我的 pytest 时总是收到以下错误:

是否删除了此功能,或者我是否需要以不同的方式指定主题以获取事件,或者这甚至是 pytest 的问题?

0 投票
1 回答
283 浏览

python - 以编程方式构建代理时,无法使用 webview 向浮士德代理/主题发送消息

我正在尝试以编程方式构建代理,并且一直在关注这个示例。当我查看在faust -A <> agents脚本开始时生成的主题和代理时,我可以看到它们。代理名称类似于{topic}_agent示例。所有这些代理分别接触到一些 rest api。这是代码。

但是,当我发布一些东西时,我收到了一个错误 - NameError: name 'gdelt_agent' is not defined。这里的任何帮助将不胜感激。

0 投票
1 回答
116 浏览

python - 滚动窗口“过期”参数小于“大小”不起作用

重现步骤

我试图在翻滚窗口中聚合一些数据,然后将处理函数应用于窗口中的数据。我正在使用expires 参数来处理迟到的事件 (假设我们可以在n+1分钟的前 10 秒内获得属于n分钟的事件)。

预期行为

我希望仅当传递n+1窗口的 10 秒以处理延迟事件时才调用n窗口的 process_window_function

实际行为

如果 Table 的expires参数小于size参数,则窗口n的 process_window_function将在窗口n+1的第一个事件之后立即调用。看起来浮士德只是忽略了expires。对于这种行为 ,可能会稍晚到达的迟到的事件将被跳过。

如果 expires 参数等于或大于大小,迟到的事件将得到正确处理,但我不希望延迟超过 10 秒。

卡夫卡输入

日志

版本

  • 蟒蛇版本3.7.9
  • 浮士德版本faust-streaming==0.6.1
  • RocksDB 版本python-rocksdb

我有可能在 Flink 中实现这种行为,但在 Faust 中遇到了这个问题。我究竟做错了什么?

0 投票
1 回答
784 浏览

python - 理解用浮士德表和分区连接浮士德主题

我有两个主题:

  • 1 个带有事件数据EventData的主题(假设5 个分区)——该主题的日志使用 CustomerID 作为键。
  • 1 个带有丰富数据的紧凑主题(EnrichmentKVs假设3 个分区)——该主题的日志使用相同的 CustomerID 作为键。

目标是将 EnrichmentKV 保存在 Faust 表中,当 EventData 日志流入时,它们会使用该表中的数据进行丰富并发布到新的流/主题。

所以我有两个Faust (python) 应用程序,每个应用程序都有自己运行的实例数量:

  • App1(N-instances running)使用 key=CustomerId 发布到 EventData 主题
  • App2(正在运行的 M 实例)执行以下操作:
    • 更新浮士德表 ( EnrichmentKVsTable) 以获取来自 EnrichmentKVs 主题的值
    • 从 EventData 主题流入,并将浮士德表中的数据与来自的数据流“连接”Eventdata

我的理解是,App2 的每个实例都只会有一个基于分区键的部分 EnrichmentKV 表。要使“JOIN”工作,任何日志EventData(key="1234")都必须与日志进入相同的App2实例EnrichmentKVsTable(key="1234")

当两个输入主题的分区不同,并且每个应用程序的实例数量也可能不同时,浮士德如何确保这一点?还是我处理这个问题是错误的?

0 投票
1 回答
273 浏览

python-3.x - 使用 relative_to_field 时 Python Faust Tumbling Window 不起作用

当翻滚窗口与 relative_to_field 一起使用时,它没有按预期工作。针对键本身更新的值不会反映在同一窗口中。它总是返回默认值而不是更新后的值。当我不使用“relative_to_field”时,它按预期工作。

预期行为

下面的打印语句,我们在更新后从翻滚窗口打印最新值应该返回在上一步中更新的值“更新后的值::”+ str(tumbling_table[key].value())“

实际行为

下面的打印语句,我们在更新后从翻滚窗口打印最新值给出默认值而不是更新值“更新后的值::”+ str(tumbling_table[key].value())“

版本

  • Python版本:3.8.8
  • 浮士德版本:v1.10.4
  • 操作系统:OSX Big Sur 11.2.3
  • 卡夫卡版本:kafka_2.12-2.3.0
  • RocksDB 版本:0.7.0

我不确定为什么在使用 relative_to_field 选项和翻转窗口时这个简单的代码不起作用。

0 投票
0 回答
249 浏览

python - 我如何运行浮士德工人并管理他们

我想从我的代码中运行浮士德工作者(从信号中运行)。我需要管理工人(暂停,杀死,重新加载)。将来,我想管理 docker 容器中的工人。也就是说,一个容器中有 3 个工人,另一个容器中有 2 个工人……或者也许有更好的方法来运行和管理工人?我已经学会了如何在容器中运行它们,每个容器手动运行一个。

我需要一个用于大量工人的守护进程,例如主管

0 投票
1 回答
181 浏览

python - 如何指定名称以数字开头的python faust.Record字段?

我通过扩展 faust.Record 类来描述浮士德模型。我的输入 json 看起来像:

所以我必须创建类字段,只用数字命名。我知道,它通常在 Python 中受到限制,但是有没有解决方法,比如我的 fieds 名称的别名?

我在 Java 中寻找类似 Jackson 注释的东西:

0 投票
1 回答
291 浏览

python - 如何捕捉 faust python json 序列化错误?

我有简单的浮士德代理。它使用来自 kafka 主题的 json,并默认将它们解析为 dicts faust 序列化器:

反序列化本身发生在我的代码之外的某个地方,它由 faust 框架管理,而不是由我管理。如何捕获和处理反序列化异常,例如在无效 json 的情况下?

0 投票
1 回答
41 浏览

python - Faust.web API 服务器挂起异常

我是 Python 和 Faust 的新手。我们正在使用 Faust.web 开发我们的 API。一切似乎都在工作,除了例外。

问题是,当我们运行 API 服务器并从 Postman 访问导致代码异常的 API 时,我们正确地得到 500 状态代码。但是当我们下次访问相同的 API 时,Postman 会等待响应,直到 API 超时。

请帮忙!