问题标签 [faust]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
870 浏览

python - 自动发现 python 装饰器

我想知道是否有一种标准化的方法或最佳实践来扫描/自动发现装饰器,就像它在这里完成的那样,而且在 Django、Flask 等其他几个库中也是如此。通常,装饰器会在调用内部函数时提供额外/包装的功能。

在下面显示的示例中以及在 Flask/Django(路由装饰器)中,装饰器用于添加总体功能,例如最初在装饰器逻辑中生成 tcp 客户端,然后在收到要处理的消息时调用内部函数它。

Flask/ Django 注册一个 url 路由,其中​​内部函数仅在稍后请求 url 时调用。所有示例都需要对装饰器逻辑进行初始注册(扫描/发现),以便初始启动总体功能。对我来说,这似乎是装饰器的另一种用途,如果有的话,我想了解最佳实践方法。

请参见下面的浮士德示例,其中装饰器 app.agent() 在异步事件循环中自动触发侦听(kafka 流)客户端,然后传入消息由内部函数 hello()稍后处理,仅当收到消息时,需要初始在脚本开始时首先检查/扫描/发现相关的装饰器逻辑。

0 投票
1 回答
1080 浏览

python - Python中有没有像Apache Nifi这样的替代品?

我需要处理数据并发送到浮士德(流式传输),就像 Nifi 正在向 kafka 发送数据一样。

由于我无法将 Faust 与 Nifi 集成,因此在 Python 中是否有任何替代 Nifi 的方法

在 nifi 中,我可以处理 csv 并将其转换为 json 并发送到 Kafka。由于我对 python 感兴趣,所以在 Python 中是否有任何应用程序,比如用 Java 构建的 Nifi。Kafka 也可以用于流式传输,而 Python 中没有 kafka 流 api

0 投票
0 回答
332 浏览

python - 为什么我的 apache kafka 消费者随机忽略排队的消息?

这可能是一个 eisenbug,所以我不期待硬性答案,而是更多关于寻找什么能够复制该错误的提示。

我有一个由多个服务组成的事件驱动、基于 kafka 的系统。目前,它们被组织成线性管道。一个主题,一种事件类型。每个服务都可以被认为是从一种事件类型到一种或多种事件类型的转换。

每个转换都作为一个 python 进程执行,有自己的消费者和自己的生产者。它们都共享相同的代码和配置,因为这都是从服务实现中抽象出来的。

现在,有什么问题。在我们的暂存环境中,有时(假设每 50 条消息中有一条)Kafka 上有一条消息,但消费者根本没有处理它。即使您等待数小时,它也会挂起。这不会发生在本地环境中,我们无法在其他任何地方重现它。

一些更相关的信息:

  • 这些服务经常出于调试目的而重新启动,但挂起似乎与重新启动无关。
  • 当消息挂起并且您重新启动服务时,该服务将处理该消息。
  • 这些服务是完全无状态的,所以没有缓存或其他奇怪的事情发生(我希望)
  • 发生这种情况时,我有证据表明他们仍在处理旧消息(我在他们产生输出时记录,这发生在消费者循环结束之前)
  • 在当前的部署中,消费者组中只有一个消费者,因此在相同的服务中没有并行处理,也没有服务的水平扩展

我如何消费:

我使用 pykafka,这是消费者循环:

我的假设是我使用消息的方式存在一些问题,或者消费者组偏移量存在一些不一致的状态,但我无法真正解决这个问题。

我也在考虑搬到浮士德来解决这个问题。鉴于我的代码库和架构决定,过渡应该不会太难,但在开始这样的工作之前,我想确定我正在朝着正确的方向前进。现在这只是一个盲目的尝试,希望造成问题的一些细节会消失。

0 投票
1 回答
386 浏览

apache-kafka - 如何使用kafka和faust检查是否在给定时间段内发送了新记录

我正在使用包含融合平台(docker)的测试设置,并且正在处理包含以下信息的记录:传感器 ID、时间戳、值。使用 robinhood 的浮士德(类似于 Kafka Streams 但在 python 中)我正在尝试执行以下操作:

每当有传感器的新记录时,都应该有一个“计时器”,如果在给定时间内没有收到该传感器 ID 的新记录,则应该有一个错误,表明该传感器/机器可能出现故障。

我尝试过使用time.sleep(),但发生的是它只会休眠 10 秒,然后处理下一条记录。

甚至可以用我正在使用的设置做这样的事情吗?

0 投票
1 回答
1376 浏览

python-3.x - 如何从 faust 应用程序向 Websocket 发送数据

我目前正在研究一个用例,使用 Kafka 和 robinhood 的浮士德来处理来自 Kafka 的数据。我已经成功地进行了计算,并且我需要的结果正在打印到我的浮士德工作者正在运行的控制台上。

现在我想找到一种方法让我的结果不仅在控制台中而且在 HTML 页面中可见。我已经查看了 websockets 库,但我无法让它与 faust 一起工作。我得到的错误是Crashed reason=RuntimeError('This event loop is already running')我认为这是因为代码是针对正在处理的每条消息执行的。

非常感谢任何帮助

这是我正在使用的代码:

0 投票
1 回答
1379 浏览

python - 我如何在浮士德中使用并发?

我正在使用 faust 并希望利用并发功能。列出的示例并未完全演示并发的使用。

我想做的是,从 kafka 生产者和 unnest json 中读取。然后将货物发送到一个进程以计算帐单等。我应该一次将 10 个货物发送到一个进行计算的函数。为此,我使用并发,因此可以同时计算 10 批货物。

0 投票
2 回答
1421 浏览

flask - 将 Flask 与 Faust 集成

我正在尝试让一个浮士德代理在烧瓶视图/端点内投射一条消息,我找不到任何例子,我真的很挣扎。

有没有人成功尝试过这个?文档说使用 gevent 或 eventlet 作为 asyncio 的桥梁,但不明白多么不幸

0 投票
1 回答
499 浏览

asyncpg - 如何在 Faust 代理中使用 asyncpg 连接池?

我希望 Faust 代理写入 PostgreSQL 表。我想使用 asyncpg 连接池,但找不到将其注入应用程序初始化代码的干净方法。

0 投票
5 回答
5890 浏览

python - 发布到 kafka 主题的浮士德示例

我很好奇您应该如何表达您希望将消息传递到浮士德的 Kafka 主题。他们自述文件中的示例似乎没有写入主题:

我希望hello.send在上面的代码中向主题发布消息,但似乎没有。

有许多阅读主题的示例,以及许多使用 cli 推送临时消息的示例。梳理完文档后,我没有看到任何明确的在代码中发布到主题的示例。我只是疯了,上面的代码应该可以工作吗?

0 投票
0 回答
537 浏览

apache-kafka - Python Faust await agent.ask() 不返回回复并挂起调用它的函数

我是 Python 新手,玩弄一些东西,尝试使用 Faust 通过 Kafka 与 Python 服务进行通信。所以我有一个小的 PoC 项目。浮士德应用定义:

我的数据库阅读器代理:

模型定义:

测试agent.ask()

所以我reader运行了 zookeeper、kafka 服务器、mongodb 和 faust worker。一切都在使用开箱即用的配置。

当我运行时,python3 test.py我看到debug <1>了预期的打印输出,但debug <2>永远不会关闭并且执行挂在那里。

浮士德医生说 在此处输入图像描述

所以我假设我做的一切都是正确的。有人有任何线索吗?