0

我已经加入了 2 个 kafka 流,它们在 spark 中流式传输 3 条记录/秒。

stream_id = 1 - dataframe 1 
stream_id = 2 - dataframe 2
stream_id 时间 v1
1 1643627396.2 3
2 1643627396.2 5
1 1643627396.22 4
2 1643627396.22 6
1 1643627396.24 7
2 1643627396.23 3
1 1643627396.26 2
2 1643627396.27 3

对于从中选择的记录,stream_id = 1我必须按时间找到最接近的匹配 项stream_id = 2

例如。如果在这种情况下:

stream_id 时间 v1
2 1643627396.23 3
1 1643627396.26 2
2 1643627396.27 3

如果我们选择与 的记录stream_id = 1,则与最接近的匹配 stream_id = 2将具有1643627396.27作为timestamp值。

基于v1这两条记录,我必须计算差异:3-2=1并将该计算与时间戳一起存储在新数据框中,最终看起来像这样:

最终数据框如下所示:

time_stream_1 time_stream_2 v1_diff
1643627396.2 1643627396.2 2
1643627396.22 1643627396.22 2
1643627396.24 1643627396.23 4
1643627396.26 1643627396.27 1

基于如果值变得大于某个值,v1_diff我必须发出 API 命令。v1_diff

问题是如何在 spark 中迭代数据帧以执行这样的操作?我认为 Spark 并不是真的为此而构建的……也许吧。我需要一些关于下一步最佳的指示,卡夫卡,浮士德也许?

关于火花:也许我应该采取 0.35 秒的批处理窗口从两个数据帧中获取 1-2 条记录,如果 v1_diff 太大,则执行计算和火灾响应。想法是尽快做出反应,最好是在记录出现时立即做出反应。

文档中提到:

在内部,默认情况下,结构化流查询使用微批处理引擎处理,该引擎将数据流作为一系列小批量作业处理,从而实现低至 100 毫秒的端到端延迟和一次性容错保证. 但是,从 Spark 2.3 开始,我们引入了一种新的低延迟处理模式,称为Continuous Processing,它可以实现低至 1 毫秒的端到端延迟,并保证至少一次。在不更改查询中的 Dataset/DataFrame 操作的情况下,您将能够根据您的应用程序要求选择模式。

100ms由于差异很大并且很重要,我如何为我的案例实现连续处理1ms

4

0 回答 0