2

我有一个包含 1000 列的数据框,我试图通过对每一列进行一些操作来获取一些统计信息。我需要对每一列进行排序,所以我基本上不能对它进行多列操作。我在一个名为processColumn的函数中执行所有这些列操作

def processColumn(df: DataFrame): Double = {

  // sort the column
  // get some statistics
}

为了完成这项工作,我将数据帧保存在内存中,并对其进行 scala 多线程处理。所以,代码是这样的

假设初始数据帧是 df

df.columns.grouped(100).foreach { columnGroups =>

  val newDf = df.select(columnGroups.head, columnGroups.tail:_*)
  newDf.persist()

  val parallelCol = columnGroups.par 
  parallelCol.tasksupport = new ForkJoinTaskSupport(
    new scala.concurrent.forkjoin.ForkJoinPool(4)
  )

  parallelCol.foreach { columnName =>

     val result = processColumn(df.select(columnName))
     // I am storing result here to a synchronized list
  }
  newDf.unpersist()
}

因此,如果您看到,我指定一次运行 4 个线程。但有时会发生一个线程卡住的情况,并且我有超过 4 个活动作业正在运行。而那些被卡住的永远不会结束。

我觉得从 scala 并行集合开始的线程有一个超时,有时它不会等待所有作业完成。然后 unpersist 被调用。因此,活动作业现在永远卡住了。我试图通过查看源代码以查看 scala 集合操作是否有超时来弄清楚,但无法确定。

任何帮助将不胜感激。另外,如果您有任何问题,请告诉我。谢谢你。

4

0 回答 0