2

我想要实现的是执行 Scala 代码。将结果 Scala RDD[Row] 转换为 PySparkRDD of Rows。执行一些 python 操作并将 pySpark Rows 的 RDD 转换回 Scala 的 RDD[Row]。为了让 RDD 到 pySpark RDD,我这样做:在 Scala 中,我有这个方法

import org.apache.spark.sql.execution.python.EvaluatePython.{javaToPython, toJava}
def toPythonRDD(rdd: RDD[Row]): JavaRDD[Array[Byte]] = { 
    javaToPython(rdd.map(r => toJava(r, r.schema)))
}

稍后在 pySpark 中,我创建了新的 RDD 调用

RDD(jrdd, sc, BatchedSerializer(PickleSerializer()))

我最终得到了 pySpark Rows 的 RDD。我想恢复那个过程。我可以通过访问 rdd._jrdd 轻松获得 Scala 的 JavaRDD[Array[Byte]]。我的主要问题是我不知道如何将其转换/取消plickle 回 RDD [Row]。我试过了

sc._jvm.SerDe.pythonToJava(rdd._to_java_object_rdd(), True)

sc._jvm.SerDe.pythonToJava(rdd._jrdd, True)

两者都以类似的异常崩溃

net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)

net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)

我知道我可以轻松地在 Scala 和 Python 之间来回传递 DF,但我的记录没有统一的模式。我正在使用 Row 的 RDD,因为我虽然已经有了一个可以重用的腌制器并且它可以工作,但到目前为止只有一个方向。

4

0 回答 0