我有以下 Java Spark 代码:
stream.foreachRDD(rdd -> {
//do some operations
List<String> jsonList = new ArrayList<String>();
rdd.foreach(msg -> {//Kafka messages
jsonList.Add(msg.value());
});
writeJsons(jsonList);//jsonList size is 0
}
我想对每条消息进行迭代,将消息添加到我的列表中,并对我的 Json 列表执行一些逻辑。
我是 Spark 的新手,我试图理解为什么在rdd.foreach
循环之后 jsonList 大小为 0。Spark 如何在节点之间共享 List?
如果我想将所有 Json 消息添加到列表中,然后使用 json 列表执行我的逻辑,我应该在我的代码中进行哪些更改?