我正在使用KafkaUtils Pyspark 实现 从Kafka流式传输大量数据(200k + 事件/批次 3 秒)。
我收到实时数据:
- 一个
sessionID
- 一个
ip
- 一个
state
我现在使用基本的Spark / Redis实现正在做的事情如下:
火花工作:
- 通过以下方式汇总数据
sessionID
:rdd_combined = rdd.map(map_data).combineByKey(lambda x: frozenset([x]), lambda s, v: s | frozenset([v]), lambda s1, s2: s1 | s2)
- 创建一个
set
不同的state
(可能是 1、2、3...) - 保留
ip
信息,然后将其转换为lon
/lat
。 - 检查是否
sessionID
在 Redis 中,如果是则更新它,否则将其写入 Redis。
然后我只在 Python 中为 Redis运行一个小脚本,检查 state 中是否有 1:
- 如果是,则该事件将在频道中发布(例如
channel_1
)并从 Redis 中删除。 - 如果不是,我们检查/更新时间戳。如果
NOW() - timestamp > 10 min
数据已发布,channel_2
否则我们什么也不做。
问题 :
我一直想知道用Spark计算大部分工作的最佳实现是什么。
- 使用
window
+ 聚合或reduceByKeyAndWindow
:我担心的是,如果我使用 10 分钟的窗口并每 3 秒对几乎相同的数据进行一次计算,则效率不是很高。 - using
updateStateByKey
看起来很有趣,但数据永远不会被删除,这可能会成为问题。另外我怎么能检查我们已经过了 10 分钟?
关于这个实现或其他可能的想法?