1

我对 Spark 对运行它的节点的代码分配机制的理解只是粗略的,mapPartitions当我希望为每个分区实例化一个带有参数的类时,我无法让我的代码在 Spark 的 API 中成功运行。

下面的代码运行良好,直到我将类演变MyWorkerClass为需要一个参数:

  val result : DataFrame =
    inputDF.as[Foo].mapPartitions(sparkIterator => {

      // (1) initialize heavy class instance once per partition
      val workerClassInstance = MyWorkerClass(bar)

      // (2) provide an iterator using a function from that class instance
      new CloseableIteratorForSparkMapPartitions[Post, Post](sparkIterator, workerClassInstance.recordProcessFunc)
    }

直到我不得不(或选择)向我的 class 添加一个构造函数参数时,上面的代码运行得非常好MyWorkerClass。传递的参数值null在 worker 中的结果是一样的,而不是 的实际值bar。不知何故,参数的序列化无法按预期工作。

你会怎么做?


其他想法/评论

我将避免添加庞大的代码CloseableIteratorForSparkMapPartitions——它只是提供了一个对 Spark 友好的迭代器,甚至可能不是其中最优雅的实现。

据我了解,构造函数参数正确传递给 Spark 工作人员,因为 Spark 在序列化内容以发送以在 Spark 工作人员上执行时如何捕获状态。但是,实例化该类确实可以无缝地使该类中包含重负载资产——通常可用于我上面代码的最后一行提供的函数;并且该类似乎确实按分区实例化。这实际上是使用mapPartitions而不是map.

这是将参数传递给它的实例化,我无法弄清楚如何启用或解决问题。在我的例子中,这个参数是一个只有在程序开始运行后才知道的值(即使在我的工作的单次执行过程中始终保持不变;它实际上是一个程序参数)。我确实需要它来进行类的初始化。

我尝试通过提供一个使用其输入参数实例化的函数来解决问题MyWorkerClass,而不是像上面那样直接实例化,但这并没有解决问题。

问题的根本症状不是任何异常,而只是实例化bar时的值MyWorkerClass只是,而不是在包含我上面包含的代码片段的代码范围内已知null的实际值!bar

* 一个相关的旧 Spark 问题讨论在这里

4

0 回答 0