0

语境

假设我有一个检查点的 DataFrame。

# spark = SparkSession
# sc = SparkContext

sc.setCheckpointDir("hdfs:/some_path/")

df = spark.createDataFrame([
        (1, "A"), (2, "B")
    ]
    , ["A", "B"]
)
df = df.checkpoint(eager=True)

# Do whatever you want to do

现在假设进程崩溃了,上下文消失了,你想恢复你检查点的数据。这应该可以像这样检索:

import pyspark.serializer as s

recovered_df = sc._checkpointFile(
    "hdfs:/some_path/checkpoint/"
    , s.PickleSerializer
)

result = recovered_df.collect()

可以在此处找到如何在 Scala 中执行此操作的模拟示例。

问题

recovered_df作为ReliableCheckpointRDD返回。无论如何,用 Scala 类表示,这应该是ReliableCheckpointRDD[InternalRow],但是是ReliableCheckpointRDD[UnsafeRow]

的错误recovered_df.collect()表明问题:

线程“服务 RDD 8”org.apache.spark.SparkException 中的异常:意外的元素类型类 org.apache.spark.sql.catalyst.expressions.UnsafeRow

此时抛出异常:

TypeError:load_stream()缺少1个必需的位置参数:引擎中的'stream'TypeError Traceback(最近一次调用)----> 1 rdd.collect()

/opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py in collect(self) 815 with SCCallSiteSync(self.context) as css: 816 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self ._jrdd.rdd()) --> 817 返回列表(_load_from_socket(sock_info, self._jrdd_deserializer)) 818 819 def reduce(self, f):

/opt/cloudera/parcels/CDH/lib/spark/python/pyspark/rdd.py in _load_from_socket(sock_info, serializer) 147 sock.settimeout(None) 148 # 垃圾回收时socket会自动关闭。--> 149 返回 serializer.load_stream(sockfile) 150 151

类型错误:load_stream() 缺少 1 个必需的位置参数:'stream'

问题

  • 如何在 pySpark 中成功恢复我的 DataFrame(或可读 RDD)?
  • 可选,如果解决方案需要:如何将 UnsafeRow 转换为 InternalRow?
4

0 回答 0