问题标签 [faust]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 如何在浮士德中使用并发?
我正在尝试在我的代码中实现 Faust 库以使用来自 Kafka 的数据,异步运行一些代码并将数据返回给 Kafka。我遵循了文档,但似乎在我的代码(或我对浮士德如何工作的理解)中犯了一个错误,因为我的应用程序中的浮士德当前一次使用来自 Kafka 的消息,并且只有在将数据返回给 Kafka 之后才会它开始使用另一条消息。
我的代码:
知道我在做什么错吗?我正在使用浮士德 1.10.4
python - Faust-Streaming 崩溃并出现错误“未分配分区”
我们最近从 faust 1.10.4 切换到 faust-streaming(0.6.9)。发布此消息后,我们看到应用程序崩溃并出现以下异常。该应用程序具有多个层,在每个阶段都对数据进行聚合和过滤。在每个阶段,处理器将消息发送到 Kafka 主题,相应的 faust 应用程序代理会使用该消息。但是对于 Kafka 主题,我们在每一层都保持分区计数相同。
- 集群大小 = 12
- 主题和表分区计数 = 36
- 浮士德流媒体版本 = 0.6.9
- kafka-python 版本 = 2.0.2
请在这里帮助我们。
faust - 结合浮士德工人的输出
我有一个有 7 个分区的主题。然后,我使用 faust 对主题执行流处理,在该主题上我想计算最后 10、30、60 和 300 秒的每个键的实例。我们正在使用一个窗口表来执行此操作,该表有一个跳跃的 1s 窗口,该窗口停留 301 秒,当新结果出现时,我们只需遍历表以计算每个所需时间框架的数量。可能不是最有效的方法,但它有效,并且由于没有滑动窗口选项,它是我们拥有的最好的方法。
那么问题是,当我们想要将其扩展到可能有数千条消息时,我们计划使用多个工作人员来分割工作负载,但这会导致两个单独的流,每个工作人员只能看到 1/n 的数据,所以只报告给定分区的计数。
有没有办法组合每个工人的窗口值?此外,考虑到每个工作人员可能会根据需求在需要时停止或启动,因此它需要是动态的。
我们不能强制键在同一个分区中,因为某些键会比其他键更频繁,并且您最终可能会在 3 个月内在一个分区中收到 100 条消息,而在另一个分区中则有几百万条消息。这也不是静态的,在不同的时间,不同的键会比其他键更频繁,所以没有办法提前计划。
apache-kafka - Faust 应用程序处理消息但在连接到 kafka 代理后不提交偏移量 #736
重置与 kafka 代理的连接时,我看到了这个问题。在这种情况下,faust 应用程序继续处理消息,但无法提交偏移量,并出现以下错误:
这种情况一直持续到 faust 应用程序重新启动。
一旦应用程序重新启动,它会重新处理消息并开始提交偏移量
结果,处理了许多重复的消息。
apache-kafka-streams - 使用 Faust 的滑动窗口
有谁知道如何使用 Faust 实现滑动窗口?
这个想法是计算一个键在 10、30、60 和 300 秒窗口中的出现次数,但我们需要在 1 秒或每次更新的基础上进行计数。
我有一个狡猾的解决方法,这似乎非常低效,我有一个翻滚的 1 秒窗口,到期时间为 300 秒,然后我使用该delta()
方法将表中的所有旧值与当前值相加。它似乎可以处理来自 6 个源的消息,每个源以 10 条消息/秒的速度运行,但这大约是我们看到滞后之前的限制。这显然是一种无法扩展的缓慢方法,所以问题是如何在不需要 KSQL 或设置 Spark 集群以及 Kafka 集群的情况下实现这一点。如果可以,我们会尽量保持简单。
更复杂的是,我们非常希望在过去 24 小时、1 周、1 个月和过去 3 个月内拥有相同的统计数据……所有这些都在运行中。但也许我们只是要求太多,而没有为每个输入提供专门的流程。
这是我的狡猾代码:
python - 确保消息在主题之间保持一致的顺序
我有一个浮夸的应用程序,它有两个主题。第一个接收格式的原始数据:
一个异步函数使用interpolate
这个主题并在每整整 15 分钟计算一次值。为此,我使用一个表来存储device_id
ABC 的最后一次出现。所以可以说看到的最后一条消息是
那么我将执行以下操作:
在另一个主题中target_topic
,我想按时间顺序计算值的增量。为此,我创建了另一个帮助表,它再次存储最后一次出现,分组在device_id
. 我现在的问题是:如何确保事件/消息的顺序没有改变,以便我的增量计算正确?
我目前的方法如下所示:
当然,我希望主题插入的顺序应该与出现在下一个主题中的消息相同,但并非在每种情况下都是如此。
以下是我看到日志中出现的消息的方式:
python - 在 Faust 应用程序中检查 kafka 主题的存在
我是浮士德的新手。
手头的任务是确保存在一些 Kafka 主题。想法是检查应用程序的活跃度检查中是否存在所有必需的主题。
我已经浏览了浮士德文档,但没有找到办法。甚至可以在浮士德应用程序中做到这一点吗?
我们可以在浮士德中使用原生 Kafka 列表主题( http://kafka.apache.org/21/javadoc/org/apache/kafka/clients/admin/AdminClient.html#listTopics-- )吗?
python - 在 faust-streaming 应用程序中初始化 aiohttp.ClientSession 的位置
我正在尝试aiohttp.ClientSession
在我的浮士德应用程序内部创建一个,这样我就可以重用它,而无需一遍又一遍地提供身份验证标头。如果我继承faust.app
然后还定义一个测试视图(应该从该会话中定义的服务中获取一些数据):
当我尝试使用它时出现以下错误:
如何让我的 HTTP 会话在 faust 创建的视图中可用?请注意,如果它被代理或计时器调用,但不是从视图调用,它会起作用。可能视图有自己的循环,但我不确定这里的最佳做法是什么。
附加问题:构造函数是初始化该会话的最佳位置吗?我试图有一个self.on_startup_finished
方法,但由于某种原因它永远不会被调用。
谢谢。
python - 如何检查 faust 应用程序是否以编程方式崩溃?
手头的任务是开发一个端点,如果 faust 应用程序正在运行,它将返回状态码 200,而当应用程序由于某种异常而崩溃时,它将返回其他状态码。目前我看到应用程序只是在某个代理中引发一些异常时挂起。但是我们如何检查代理是否在应用程序中正常工作。