2

我有以下 Java Spark 代码:

stream.foreachRDD(rdd -> {
    //do some operations
    List<String> jsonList = new ArrayList<String>();
    rdd.foreach(msg -> {//Kafka messages
         jsonList.Add(msg.value());
    });

    writeJsons(jsonList);//jsonList size is 0
}

我想对每条消息进行迭代,将消息添加到我的列表中,并对我的 Json 列表执行一些逻辑。

我是 Spark 的新手,我试图理解为什么在rdd.foreach循环之后 jsonList 大小为 0。Spark 如何在节点之间共享 List?

如果我想将所有 Json 消息添加到列表中,然后使用 json 列表执行我的逻辑,我应该在我的代码中进行哪些更改?

4

1 回答 1

0

这篇在 Spark RDD foreach 中修改集合的帖子中提供了它不起作用的详细原因。jsonList基本上,当您在 RDD 的 foreach 中引用本地对象 ( ) 时,该对象将被序列化到每个工作人员。但是,它永远不会序列化回驱动程序。因此,在您的情况下,当您add从工作人员那里致电时,您修改了对象的副本而不是原始对象,这就是原始对象保持为空的原因。

另一种方法是使用 Spark 计算列表,然后将其收集到驱动程序。

List<String> jsonList = rdd.map(msg -> msg.value()).collect();
于 2018-07-05T08:52:27.590 回答