我一直在努力执行 DStream 和 RDD 之间的连接。设置场景:
- 火花 - 2.3.1
- 蟒蛇 - 3.6.3
RDD
我正在从 CSV 文件中读取 RDD,拆分记录并生成一对 RDD。
sku_prices = sc.textFile("sku-catalog.csv")\
.map(lambda line: line.split(","))\
.map(lambda fields: (fields[0], float(fields[1])))
这是来自的输出sku_prices.collect()
:
[('0003003001', 19.25),
('0001017002', 2.25),
('0001017003', 3.5),
('0003013001', 18.75),
('0004017002', 16.5),
('0002008001', 2.25),
('0004002001', 10.75),
('0005020002', 10.5),
('0001004002', 3.5),
('0002016003', 14.25)]
数据流
我正在阅读来自 Kafka 的 DStream。
orders = kstream.map(lambda n: n[1]).map(lambda n: json.loads(n))
items = orders.map(lambda order: order['items'])\
.flatMap(lambda items: [(i['sku'], i['count']) for i in items])\
.reduceByKey(lambda x, y: x + y)
当我运行pprint()
时,orders
我得到如下所示的输出:
-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', 3)
('0002016003', 1)
('0003013001', 1)
加入
现在我想将items
DStream加入sku_prices
RDD。我知道我不能直接加入,但我的阅读表明我可以使用transform()
DStream 上的方法来完成这项工作。所以这就是我所拥有的:
items.transform(lambda rdd: rdd.join(sku_prices)).pprint()
我期待得到一个看起来像这样的 DStream:
-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', (3, 10.75))
('0002016003', (1, 14.25))
('0003013001', (1, 18.75))
Spark 文档建议这应该有效并且确实有效:结果正是我得到的!:)
检查点
但是我也想做一个有状态的操作,所以我需要引入检查点。
ssc.checkpoint("checkpoint")
只需在以下位置添加检查点就会导致此错误transform()
:
您似乎正在尝试广播 RDD 或从操作或转换中引用 RDD。RDD 转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(lambda x: rdd2.values.count() * x) 无效,因为值转换和计数操作无法在 rdd1.map 转换内部执行。
该线程上的答案表明检查点和外部 RDD 不能混合使用。有没有解决的办法?当 StreamingContext 启用了检查点时,是否可以加入 DStream 和 RDD?
谢谢,安德鲁。