2

我正在尝试编写一个接受多种泛型类型并将一个工作单元作为参数来执行的方法。

这个想法是工作单元是一个通用的功能,它本身就是通用的。例如,假设它类似于以下内容:

def loadModelRdd[T: TypeTag](sc: SparkContext): RDD[T] = {
  ...
}

loadModelRdd() 将在一些内部处理(如加载模型信息等)之后构造给定类型的 RDD。

我一直在破解的原型方法如下所示(无效):

def forkAll[A : Manifest, B : Manifest](work: => RDD[_]): (RDD[A], RDD[B]) = {
  def aFuture = Future { work } // How can I notify that this work call returns type A?
  def bFuture = Future { work } // How can I notify that this work call returns type B?

  val res = for {
    a <- aFuture
    b <- bFuture
  } yield (a.asInstanceOf[A], b.asInstanceOf[B])

  Await.result(res, 10.seconds)
}

这是我正在处理的代码的缩短版本,因为我实际上正在考虑接受多达 10 种不同的类型。

如您所见,forkAll 方法的总体目标是将工作单元包装在 Future 中,对每种类型的工作单元执行 fork-join,然后将结果作为 Tuple 的结果返回。一个示例消费者声明将是:

val (a, b) = forkAll[ClassA, ClassB](loadModelRdd)

即此时我想分叉加入并等待结果,但我希望并行执行执行,然后收集回驱动程序(具体来说是 Spark 驱动程序)。

问题是我不确定在构造 Future {} 块时如何强制 forkAll 中的工作单元返回的类型。如果没有 forkAll,实现如下所示:

val resA = loadModelRdd[ClassA](sc)
val resB = loadModelRdd[ClassB](sc)
...

我正在考虑这样做有两个原因:

  1. 为与此模型匹配的任何工作单元抽象分叉连接的细节。
  2. 该代码的一个版本明确说明了工作单元是什么,它正在生产中工作,并负责将长时间运行的块的执行减少近一半。我有几个可以应用此模式的执行步骤

这在 Scala 的类型系统中是可能的吗?还是我应该从不同的角度看待这个问题?我已经尝试了几种实现(包括此处描述的一种),但我还没有找到适合我当前对问题的看法的一种

如果需要任何其他信息,请告诉我。

谢谢!

4

1 回答 1

0

简短的回答: Scala 不允许带有类型参数的函数,所以你想要的并不完全可能。

您正在尝试传递带有类型参数的方法。虽然方法可以有类型参数,但函数不能。当你试图传递一个方法时,它就像一个匿名函数,所以你必须指定一个类型。

但是,由于方法确实允许类型参数,因此您可以通过创建一个抽象类来执行您的 fork/join 来利用这一点

abstract class ForkJoin {

  protected def work[T]: RDD[T]

  def apply[A, B]: (RDD[A], RDD[B]) = {
    // Write implementation of fork/join here
    (work[A], work[B])
  }
}

然后覆盖类型泛型work方法,以便它执行您想要的操作,例如调用其他一些预定义的方法。

val forkJoin = new ForkJoin {
  override protected def work[T]: RDD[T] =
    loadModelRdd[T](sc)
}

val (intRdd, stringRdd) = forkJoin[Int, String]

看看这个可以编译和运行没有问题的原型实现。

于 2015-11-14T03:52:44.597 回答