我有一个包含键值对的 JavaPairDStream。我需要将它转换为 HashMap。我已经尝试通过在其上调用“collectAsMap()”函数及其工作来对普通 JavaPairRDD 执行相同操作,但是当我尝试在 DStream 上执行相同操作时,它失败了。
我试图通过使用“foreachRDD”函数将“JavaPairDStream”转换为“JavaPairRDD”,然后在 JavaPairRDD 上使用“collectAsMap()”函数来实现相同的目的。
Map<String,String> value= new HashMap<String,String>();
value=line.collectAsMap();
//Here "line" is a "JavaPairRDD<String,String>".
它没有给出任何编译错误,但是当我运行程序时它会失败并抛出如下错误。
java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lscala.Tuple2;
at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:447)
at org.apache.spark.api.java.JavaPairRDD.collectAsMap(JavaPairRDD.scala:464)
at attempt1.CSV_Spark$3.call(CSV_Spark.java:109)
at attempt1.CSV_Spark$3.call(CSV_Spark.java:1)
我不确定我的方法是否正确。普通的“JavaPairRDD”和由“foreachRDD”函数创建的有什么区别吗?为什么相同的方法适用于普通的“JavaPairRDD”,但当我将其应用于通过在 JavaPairDStream 上应用“foreachRDD”函数创建的“JavaPairRDD”时会失败。如果我在任何地方出错,请告诉我。另外,如果还有其他方法,请在此处发布。谢谢。