我正在重新设计一个基于流式 IoT 传感器数据的实时预测管道。该管道正在摄取传感器数据样本,其结构与(sensor_id, timestamp, sample_index, value)
在源系统中创建时一样,将它们保存在本地并运行 pyspark 批处理作业以训练算法和进行预测。
目前,传感器数据保存在磁盘上的本地文件中,每个传感器只有一个文件,并保存到 HDFS 以进行火花流式传输。流式作业获取每个微批次,计算每个传感器到达的样本数量,并决定哪些传感器积累了足够的新数据来做出新的预测。然后,它将 RDD 中的每个传感器行映射到使用 pythonopen
方法打开数据文件的方法,扫描到最后处理的样本,从该样本中提取数据以及预测所需的一些历史数据,然后运行预测作业在火花簇上。此外,每个算法的每个固定数量的样本都需要重新调整,它从同一数据存储中查询很长的历史并在 Spark 集群上运行。
最后,预测作业处理的 RDD 如下所示:
|-----------------------------|
| sensor_id | sensor_data |
|-----------------------------|
| SENSOR_0 | [13,52,43,54,5] |
| SENSOR_1 | [22,42,23,3,35] |
| SENSOR_2 | [43,2,53,64,42] |
|-----------------------------|
我们现在在监控几十万个传感器时遇到了规模问题。在这个过程中,成本最高的操作似乎是从文件中读取数据——读取每个文件的几十毫秒延迟累积到整个预测作业的无法控制的延迟。此外,将数据作为平面文件存储在磁盘上根本无法扩展。
我们正在研究改变存储方法以提高性能并提供可扩展性。使用时间序列数据库(我们尝试过 timescaledb 和 influxdb)会导致在一个查询中查询所有传感器的数据的问题,当每个传感器需要从不同的时间点查询时,然后将单独的样本分组到sensor_data
列中,如图所示以上,这是非常昂贵的,会导致大量的洗牌,甚至不如平面文件解决方案。我们也在尝试 parquet 文件,但是它们的单一写入行为使得计划在这种情况下表现良好的数据结构变得困难。
tl;dr - 我正在为以下场景寻找高性能架构:
- 实时摄取流式传感器数据
- 当传感器累积足够的样本时,查询当前+历史数据并将其发送到预测作业
- 每个预测作业处理在最后一个微批次中达到阈值的所有传感器
- RDD 包含传感器 ID 行和所有查询样本的有序数组