问题标签 [faust]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1005 浏览

python-3.x - 如何在多个代理或浮士德计时器之间共享浮士德表?

我正在尝试在一段时间后将浮士德表的数据(计数)发布到 kafka 主题。当我发布一些简单的字符串时,计时器正在工作,但它无法以某种方式访问​​表的数据。以下是定时器的代码:

代理工作正常。我能够正确地看到这些值。

0 投票
2 回答
984 浏览

python-3.x - Faust 框架(或其他框架)中是否有 celery countdown 和 eta 的替代方案?

我想通过用户设置在特定时间触发一些任务。例如,如果用户设置为下午 4:00,那么我将在下午 4:00 运行任务。它可以在 Celery 中使用倒计时和 eta 进行处理。但我的经纪人更喜欢卡夫卡。有没有 Celery 倒计时和 eta 的替代品?

Celery 中的代码如下:

我希望不要使用 Celery,而必须使用 Kafka

0 投票
2 回答
367 浏览

faust - 使用 faust 流式访问 rpc 回复中的 kafka 标头

是否可以在 faust rpc 回复中访问 kafka 标头?这是两个浮士德代理的示例。一个 (the pow) 调用另一个 ( mul) 并接收一个结果作为值。但是如何知道reply topic中的kafka headers呢?

0 投票
1 回答
182 浏览

faust - 让浮士德代理等待其他代理完成

当其他一些代理完成时,有没有办法让浮士德代理运行?例如

Agent_final应该在全部Agent1,Agent2,Agent3完成后开始。并且Agent_final应该可以访问所有 3 个代理输出。在文档中没有找到这种常见的方法。

0 投票
1 回答
144 浏览

redis - 在给定的时间窗口内,处理主题中每个消费者的最大任务数?

n我的生产者从单个输入消息生成任务并将它们发布到topic.

要求是,在 的消费者组中的所有个人消费者中,任何人都不能在 1 小时内topic处理超过 3 个这些任务中的任何一个。n

这意味着如果我想立即处理所有这些消息,我至少需要ceil(n/3)消费者。如果消费者少于ceil(n/3)消费者,那么我需要某种方式将消息推迟到num_processed < 3最后一小时。

就实施此解决方案的实用性而言,我希望将 Kafka 与 Faust [1] 一起使用,但如有必要,我也可以访问 Redis。

到目前为止,我的想法是确保ceil(n/3)在生产时至少有消费者,然后只使用生产者的循环分配任务topic。无论如何,这是最佳解决方案,因为它无需等待长达 1 小时来处理消息。然而,这只会在足够多的消费者死亡之后才会起作用,因此同一消费者很可能会在 1 小时内处理超过 3 个。这是无法接受的。

另一个想法可能是让消费者在每次收到消息时检查他们是否已经执行了 3 个n任务,如果是,则以某种方式请求另一个消费者处理它 - 但我在 Kafka 中找不到任何合适的机制来启用此功能.

[1] https://faust.readthedocs.io/

0 投票
1 回答
1889 浏览

python - 无法连接到 id 为 1 的节点:[Worker]:错误:ConnectionError('No connection to node with id')

我正在尝试使用 robinhood / faust 但没有成功!

我已经成功地在我的 confluent-kafka localhost 实例中创建了一个插入原始主题的生产者!

但浮士德无法连接到本地主机。

我的应用程序.py:

错误

我正在运行:faust -A app worker -l debug

0 投票
1 回答
1087 浏览

python - 两个代理在一个 kafka 主题上具有不同的过滤器。浮士德流中的确认

我想让两个浮士德代理监听同一个 kafka 主题,但是每个代理在处理事件之前都使用自己的过滤器,并且它们的事件集不会相交。

在文档中,我们有一个示例: https ://faust.readthedocs.io/en/latest/userguide/streams.html#id4

如果两个代理使用订阅同一主题的流:

Conductor 会将收到的关于“orders”主题的每条消息转发给两个代理,每当它进入代理流时都会增加引用计数。

当事件被确认时,引用计数减少,当它达到零时,消费者将认为该偏移量“完成”并可以提交它。

下面是过滤器https://faust.readthedocs.io/en/latest/userguide/streams.html#id13

我使用了一些复杂的过滤器,但结果将流分成两部分,用于具有完全不同逻辑的两个代理。(我不使用 group_by)

如果两个代理一起工作,一切正常。但是,如果我停止它们并重新启动它们,它们将从头开始处理流。因为每一个事件都没有得到代理人之一的承认。如果我确认每个代理中的所有事件,而如果其中一个代理不会启动,那么第二个代理将清除该主题。(如果一个被粉碎并重新启动,指挥将看到三个订阅者,因为它正在等待粉碎的代理响应 20 分钟)。

我只想将事件分为两部分。在这种情况下如何进行适当的同步?

0 投票
0 回答
318 浏览

faust - 浮士德。如何创建压缩主题

我想创建一个由kafka自动压缩的主题,使用浮士德流。我使用如下代码:

我也尝试了 options internal, retention,deleting创建主题,并且每次更改主题名称时,都应该创建新主题。但我没有看到卡夫卡实际上压缩了这个话题。每次阅读该主题时,我都会看到完整的事件历史记录。

如何用浮士德创建一个紧凑的主题?

0 投票
2 回答
1797 浏览

faust - 如何将浮士德中的消费者设置为特定的偏移量

从浮士德文档中,我无法找到如何将消费者设置为特定的偏移量。

使用 confluent-kafka 我使用 consumer.offsets_for_times 来查找 start_offset,然后将 TopicPartition 分配给该特定偏移量,例如:

对于浮士德,我找不到更多的东西:

这只让你设置最早或最晚。我将如何从特定时间或一天的开始开始阅读?

0 投票
0 回答
823 浏览

kafka-python - Faust 表查找性能

我正在尝试对到达 Kafka 主题(比如“datatopic”)的消息执行查找。查找源是另一个 Kafka 主题(比如“lookuptopic”)。为此,我使用 faust 创建了一个表,并创建了一个代理来使用新消息更新此表。在同一个应用程序中,我创建了另一个代理来执行查找(基于公共 ID 属性)。查找表中只有几千条记录,性能有点慢 - 每条记录 8 毫秒,相当于每秒 125 条记录。我只是想验证我是否以正确的方式做事(很抱歉我无法阅读整个浮士德文档)。

这是代码