我已经加入了 2 个 kafka 流,它们在 spark 中流式传输 3 条记录/秒。
stream_id = 1 - dataframe 1
stream_id = 2 - dataframe 2
| stream_id | 时间 | v1 |
|---|---|---|
| 1 | 1643627396.2 | 3 |
| 2 | 1643627396.2 | 5 |
| 1 | 1643627396.22 | 4 |
| 2 | 1643627396.22 | 6 |
| 1 | 1643627396.24 | 7 |
| 2 | 1643627396.23 | 3 |
| 1 | 1643627396.26 | 2 |
| 2 | 1643627396.27 | 3 |
对于从中选择的记录,stream_id = 1我必须按时间找到最接近的匹配 项stream_id = 2。
例如。如果在这种情况下:
| stream_id | 时间 | v1 |
|---|---|---|
| 2 | 1643627396.23 | 3 |
| 1 | 1643627396.26 | 2 |
| 2 | 1643627396.27 | 3 |
如果我们选择与 的记录stream_id = 1,则与最接近的匹配 stream_id = 2将具有1643627396.27作为timestamp值。
基于v1这两条记录,我必须计算差异:3-2=1并将该计算与时间戳一起存储在新数据框中,最终看起来像这样:
最终数据框如下所示:
| time_stream_1 | time_stream_2 | v1_diff |
|---|---|---|
| 1643627396.2 | 1643627396.2 | 2 |
| 1643627396.22 | 1643627396.22 | 2 |
| 1643627396.24 | 1643627396.23 | 4 |
| 1643627396.26 | 1643627396.27 | 1 |
基于如果值变得大于某个值,v1_diff我必须发出 API 命令。v1_diff
问题是如何在 spark 中迭代数据帧以执行这样的操作?我认为 Spark 并不是真的为此而构建的……也许吧。我需要一些关于下一步最佳的指示,卡夫卡,浮士德也许?
关于火花:也许我应该采取 0.35 秒的批处理窗口从两个数据帧中获取 1-2 条记录,如果 v1_diff 太大,则执行计算和火灾响应。想法是尽快做出反应,最好是在记录出现时立即做出反应。
文档中提到:
在内部,默认情况下,结构化流查询使用微批处理引擎处理,该引擎将数据流作为一系列小批量作业处理,从而实现低至 100 毫秒的端到端延迟和一次性容错保证. 但是,从 Spark 2.3 开始,我们引入了一种新的低延迟处理模式,称为Continuous Processing,它可以实现低至 1 毫秒的端到端延迟,并保证至少一次。在不更改查询中的 Dataset/DataFrame 操作的情况下,您将能够根据您的应用程序要求选择模式。
100ms由于差异很大并且很重要,我如何为我的案例实现连续处理1ms?