我对 Spark 对运行它的节点的代码分配机制的理解只是粗略的,mapPartitions
当我希望为每个分区实例化一个带有参数的类时,我无法让我的代码在 Spark 的 API 中成功运行。
下面的代码运行良好,直到我将类演变MyWorkerClass
为需要一个参数:
val result : DataFrame =
inputDF.as[Foo].mapPartitions(sparkIterator => {
// (1) initialize heavy class instance once per partition
val workerClassInstance = MyWorkerClass(bar)
// (2) provide an iterator using a function from that class instance
new CloseableIteratorForSparkMapPartitions[Post, Post](sparkIterator, workerClassInstance.recordProcessFunc)
}
直到我不得不(或选择)向我的 class 添加一个构造函数参数时,上面的代码运行得非常好MyWorkerClass
。传递的参数值null
在 worker 中的结果是一样的,而不是 的实际值bar
。不知何故,参数的序列化无法按预期工作。
你会怎么做?
其他想法/评论
我将避免添加庞大的代码CloseableIteratorForSparkMapPartitions
——它只是提供了一个对 Spark 友好的迭代器,甚至可能不是其中最优雅的实现。
据我了解,构造函数参数未正确传递给 Spark 工作人员,因为 Spark 在序列化内容以发送以在 Spark 工作人员上执行时如何捕获状态。但是,实例化该类确实可以无缝地使该类中包含重负载资产——通常可用于我上面代码的最后一行提供的函数;并且该类似乎确实按分区实例化。这实际上是使用mapPartitions
而不是map
.
这是将参数传递给它的实例化,我无法弄清楚如何启用或解决问题。在我的例子中,这个参数是一个只有在程序开始运行后才知道的值(即使在我的工作的单次执行过程中始终保持不变;它实际上是一个程序参数)。我确实需要它来进行类的初始化。
我尝试通过提供一个使用其输入参数实例化的函数来解决问题MyWorkerClass
,而不是像上面那样直接实例化,但这并没有解决问题。
问题的根本症状不是任何异常,而只是实例化bar
时的值MyWorkerClass
只是,而不是在包含我上面包含的代码片段的代码范围内已知null
的实际值!bar
* 一个相关的旧 Spark 问题讨论在这里