3

我一直在努力执行 DStream 和 RDD 之间的连接。设置场景:

  • 火花 - 2.3.1
  • 蟒蛇 - 3.6.3

RDD

我正在从 CSV 文件中读取 RDD,拆分记录并生成一对 RDD。

sku_prices = sc.textFile("sku-catalog.csv")\
    .map(lambda line: line.split(","))\
    .map(lambda fields: (fields[0], float(fields[1])))

这是来自的输出sku_prices.collect()

[('0003003001', 19.25),
 ('0001017002', 2.25),
 ('0001017003', 3.5),
 ('0003013001', 18.75),
 ('0004017002', 16.5),
 ('0002008001', 2.25),
 ('0004002001', 10.75),
 ('0005020002', 10.5),
 ('0001004002', 3.5),
 ('0002016003', 14.25)]

数据流

我正在阅读来自 Kafka 的 DStream。

orders = kstream.map(lambda n: n[1]).map(lambda n: json.loads(n))

items = orders.map(lambda order: order['items'])\
              .flatMap(lambda items: [(i['sku'], i['count']) for i in items])\
              .reduceByKey(lambda x, y: x + y)

当我运行pprint()时,orders我得到如下所示的输出:

-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', 3)
('0002016003', 1)
('0003013001', 1)

加入

现在我想将itemsDStream加入sku_pricesRDD。我知道我不能直接加入,但我的阅读表明我可以使用transform()DStream 上的方法来完成这项工作。所以这就是我所拥有的:

items.transform(lambda rdd: rdd.join(sku_prices)).pprint()

我期待得到一个看起来像这样的 DStream:

-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', (3, 10.75))
('0002016003', (1, 14.25))
('0003013001', (1, 18.75))

Spark 文档建议这应该有效并且确实有效:结果正是我得到的!:)

检查点

但是我也想做一个有状态的操作,所以我需要引入检查点。

ssc.checkpoint("checkpoint")

只需在以下位置添加检查点就会导致此错误transform()

您似乎正在尝试广播 RDD 或从操作或转换中引用 RDD。RDD 转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(lambda x: rdd2.values.count() * x) 无效,因为值转换和计数操作无法在 rdd1.map 转换内部执行。

该线程上的答案表明检查点和外部 RDD 不能混合使用。有没有解决的办法?当 StreamingContext 启用了检查点时,是否可以加入 DStream 和 RDD?

谢谢,安德鲁。

4

0 回答 0