我已经加入了 2 个 kafka 流,它们在 spark 中流式传输 3 条记录/秒。
stream_id = 1 - dataframe 1
stream_id = 2 - dataframe 2
stream_id | 时间 | v1 |
---|---|---|
1 | 1643627396.2 | 3 |
2 | 1643627396.2 | 5 |
1 | 1643627396.22 | 4 |
2 | 1643627396.22 | 6 |
1 | 1643627396.24 | 7 |
2 | 1643627396.23 | 3 |
1 | 1643627396.26 | 2 |
2 | 1643627396.27 | 3 |
对于从中选择的记录,stream_id = 1
我必须按时间找到最接近的匹配 项stream_id = 2
。
例如。如果在这种情况下:
stream_id | 时间 | v1 |
---|---|---|
2 | 1643627396.23 | 3 |
1 | 1643627396.26 | 2 |
2 | 1643627396.27 | 3 |
如果我们选择与 的记录stream_id = 1
,则与最接近的匹配 stream_id = 2
将具有1643627396.27
作为timestamp
值。
基于v1
这两条记录,我必须计算差异:3-2=1
并将该计算与时间戳一起存储在新数据框中,最终看起来像这样:
最终数据框如下所示:
time_stream_1 | time_stream_2 | v1_diff |
---|---|---|
1643627396.2 | 1643627396.2 | 2 |
1643627396.22 | 1643627396.22 | 2 |
1643627396.24 | 1643627396.23 | 4 |
1643627396.26 | 1643627396.27 | 1 |
基于如果值变得大于某个值,v1_diff
我必须发出 API 命令。v1_diff
问题是如何在 spark 中迭代数据帧以执行这样的操作?我认为 Spark 并不是真的为此而构建的……也许吧。我需要一些关于下一步最佳的指示,卡夫卡,浮士德也许?
关于火花:也许我应该采取 0.35 秒的批处理窗口从两个数据帧中获取 1-2 条记录,如果 v1_diff 太大,则执行计算和火灾响应。想法是尽快做出反应,最好是在记录出现时立即做出反应。
文档中提到:
在内部,默认情况下,结构化流查询使用微批处理引擎处理,该引擎将数据流作为一系列小批量作业处理,从而实现低至 100 毫秒的端到端延迟和一次性容错保证. 但是,从 Spark 2.3 开始,我们引入了一种新的低延迟处理模式,称为Continuous Processing,它可以实现低至 1 毫秒的端到端延迟,并保证至少一次。在不更改查询中的 Dataset/DataFrame 操作的情况下,您将能够根据您的应用程序要求选择模式。
100ms
由于差异很大并且很重要,我如何为我的案例实现连续处理1ms
?