我正在使用KafkaUtils Pyspark 实现 从Kafka流式传输大量数据(200k + 事件/批次 3 秒)。
我收到实时数据:
- 一个
sessionID - 一个
ip - 一个
state
我现在使用基本的Spark / Redis实现正在做的事情如下:
火花工作:
- 通过以下方式汇总数据
sessionID:rdd_combined = rdd.map(map_data).combineByKey(lambda x: frozenset([x]), lambda s, v: s | frozenset([v]), lambda s1, s2: s1 | s2) - 创建一个
set不同的state(可能是 1、2、3...) - 保留
ip信息,然后将其转换为lon/lat。 - 检查是否
sessionID在 Redis 中,如果是则更新它,否则将其写入 Redis。
然后我只在 Python 中为 Redis运行一个小脚本,检查 state 中是否有 1:
- 如果是,则该事件将在频道中发布(例如
channel_1)并从 Redis 中删除。 - 如果不是,我们检查/更新时间戳。如果
NOW() - timestamp > 10 min数据已发布,channel_2否则我们什么也不做。
问题 :
我一直想知道用Spark计算大部分工作的最佳实现是什么。
- 使用
window+ 聚合或reduceByKeyAndWindow:我担心的是,如果我使用 10 分钟的窗口并每 3 秒对几乎相同的数据进行一次计算,则效率不是很高。 - using
updateStateByKey看起来很有趣,但数据永远不会被删除,这可能会成为问题。另外我怎么能检查我们已经过了 10 分钟?
关于这个实现或其他可能的想法?