问题标签 [faust]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
139 浏览

python - 如何在浮士德中使用并发?

我正在尝试在我的代码中实现 Faust 库以使用来自 Kafka 的数据,异步运行一些代码并将数据返回给 Kafka。我遵循了文档,但似乎在我的代码(或我对浮士德如何工作的理解)中犯了一个错误,因为我的应用程序中的浮士德当前一次使用来自 Kafka 的消息,并且只有在将数据返回给 Kafka 之后才会它开始使用另一条消息。

我的代码:

知道我在做什么错吗?我正在使用浮士德 1.10.4

0 投票
0 回答
68 浏览

python - Faust-Streaming 崩溃并出现错误“未分配分区”

我们最近从 faust 1.10.4 切换到 faust-streaming(0.6.9)。发布此消息后,我们看到应用程序崩溃并出现以下异常。该应用程序具有多个层,在每个阶段都对数据进行聚合和过滤。在每个阶段,处理器将消息发送到 Kafka 主题,相应的 faust 应用程序代理会使用该消息。但是对于 Kafka 主题,我们在每一层都保持分区计数相同。

  • 集群大小 = 12
  • 主题和表分区计数 = 36
  • 浮士德流媒体版本 = 0.6.9
  • kafka-python 版本 = 2.0.2

请在这里帮助我们。

0 投票
0 回答
57 浏览

faust - 结合浮士德工人的输出

我有一个有 7 个分区的主题。然后,我使用 faust 对主题执行流处理,在该主题上我想计算最后 10、30、60 和 300 秒的每个键的实例。我们正在使用一个窗口表来执行此操作,该表有一个跳跃的 1s 窗口,该窗口停留 301 秒,当新结果出现时,我们只需遍历表以计算每个所需时间框架的数量。可能不是最有效的方法,但它有效,并且由于没有滑动窗口选项,它是我们拥有的最好的方法。

那么问题是,当我们想要将其扩展到可能有数千条消息时,我们计划使用多个工作人员来分割工作负载,但这会导致两个单独的流,每个工作人员只能看到 1/n 的数据,所以只报告给定分区的计数。

有没有办法组合每个工人的窗口值?此外,考虑到每个工作人员可能会根据需求在需要时停止或启动,因此它需要是动态的。

我们不能强制键在同一个分区中,因为某些键会比其他键更频繁,并且您最终可能会在 3 个月内在一个分区中收到 100 条消息,而在另一个分区中则有几百万条消息。这也不是静态的,在不同的时间,不同的键会比其他键更频繁,所以没有办法提前计划。

0 投票
0 回答
88 浏览

apache-kafka - Faust 应用程序处理消息但在连接到 kafka 代理后不提交偏移量 #736

重置与 kafka 代理的连接时,我看到了这个问题。在这种情况下,faust 应用程序继续处理消息,但无法提交偏移量,并出现以下错误:

这种情况一直持续到 faust 应用程序重新启动。

一旦应用程序重新启动,它会重新处理消息并开始提交偏移量

结果,处理了许多重复的消息。

0 投票
1 回答
141 浏览

apache-kafka-streams - 使用 Faust 的滑动窗口

有谁知道如何使用 Faust 实现滑动窗口?

这个想法是计算一个键在 10、30、60 和 300 秒窗口中的出现次数,但我们需要在 1 秒或每次更新的基础上进行计数。

我有一个狡猾的解决方法,这似乎非常低效,我有一个翻滚的 1 秒窗口,到期时间为 300 秒,然后我使用该delta()方法将表中的所有旧值与当前值相加。它似乎可以处理来自 6 个源的消息,每个源以 10 条消息/秒的速度运行,但这大约是我们看到滞后之前的限制。这显然是一种无法扩展的缓慢方法,所以问题是如何在不需要 KSQL 或设置 Spark 集群以及 Kafka 集群的情况下实现这一点。如果可以,我们会尽量保持简单。

更复杂的是,我们非常希望在过去 24 小时、1 周、1 个月和过去 3 个月内拥有相同的统计数据……所有这些都在运行中。但也许我们只是要求太多,而没有为每个输入提供专门的流程。

这是我的狡猾代码:

0 投票
0 回答
26 浏览

python - 确保消息在主题之间保持一致的顺序

我有一个浮夸的应用程序,它有两个主题。第一个接收格式的原始数据:

一个异步函数使用interpolate这个主题并在每整整 15 分钟计算一次值。为此,我使用一个表来存储device_idABC 的最后一次出现。所以可以说看到的最后一条消息是

那么我将执行以下操作:

在另一个主题中target_topic,我想按时间顺序计算值的增量。为此,我创建了另一个帮助表,它再次存储最后一次出现,分组在device_id. 我现在的问题是:如何确保事件/消息的顺序没有改变,以便我的增量计算正确?

我目前的方法如下所示:

当然,我希望主题插入的顺序应该与出现在下一个主题中的消息相同,但并非在每种情况下都是如此。

以下是我看到日志中出现的消息的方式:

0 投票
1 回答
69 浏览

python - 在 Faust 应用程序中检查 kafka 主题的存在

我是浮士德的新手。

手头的任务是确保存在一些 Kafka 主题。想法是检查应用程序的活跃度检查中是否存在所有必需的主题。

我已经浏览了浮士德文档,但没有找到办法。甚至可以在浮士德应用程序中做到这一点吗?

我们可以在浮士德中使用原生 Kafka 列表主题( http://kafka.apache.org/21/javadoc/org/apache/kafka/clients/admin/AdminClient.html#listTopics-- )吗?

0 投票
0 回答
41 浏览

faust - 浮士德 - 如何可视化表格内的数据?

我了解如何在表中插入数据,但是我该如何访问它呢?

在此处输入图像描述

如何在屏幕上打印“order_count”中的数据?

0 投票
0 回答
65 浏览

python - 在 faust-streaming 应用程序中初始化 aiohttp.ClientSession 的位置

我正在尝试aiohttp.ClientSession在我的浮士德应用程序内部创建一个,这样我就可以重用它,而无需一遍又一遍地提供身份验证标头。如果我继承faust.app

然后还定义一个测试视图(应该从该会话中定义的服务中获取一些数据):

当我尝试使用它时出现以下错误:

如何让我的 HTTP 会话在 faust 创建的视图中可用?请注意,如果它被代理或计时器调用,但不是从视图调用,它会起作用。可能视图有自己的循环,但我不确定这里的最佳做法是什么。

附加问题:构造函数是初始化该会话的最佳位置吗?我试图有一个self.on_startup_finished方法,但由于某种原因它永远不会被调用。

谢谢。

0 投票
0 回答
21 浏览

python - 如何检查 faust 应用程序是否以编程方式崩溃?

手头的任务是开发一个端点,如果 faust 应用程序正在运行,它将返回状态码 200,而当应用程序由于某种异常而崩溃时,它将返回其他状态码。目前我看到应用程序只是在某个代理中引发一些异常时挂起。但是我们如何检查代理是否在应用程序中正常工作。