我正在尝试编写一个接受多种泛型类型并将一个工作单元作为参数来执行的方法。
这个想法是工作单元是一个通用的功能,它本身就是通用的。例如,假设它类似于以下内容:
def loadModelRdd[T: TypeTag](sc: SparkContext): RDD[T] = {
...
}
loadModelRdd() 将在一些内部处理(如加载模型信息等)之后构造给定类型的 RDD。
我一直在破解的原型方法如下所示(无效):
def forkAll[A : Manifest, B : Manifest](work: => RDD[_]): (RDD[A], RDD[B]) = {
def aFuture = Future { work } // How can I notify that this work call returns type A?
def bFuture = Future { work } // How can I notify that this work call returns type B?
val res = for {
a <- aFuture
b <- bFuture
} yield (a.asInstanceOf[A], b.asInstanceOf[B])
Await.result(res, 10.seconds)
}
这是我正在处理的代码的缩短版本,因为我实际上正在考虑接受多达 10 种不同的类型。
如您所见,forkAll 方法的总体目标是将工作单元包装在 Future 中,对每种类型的工作单元执行 fork-join,然后将结果作为 Tuple 的结果返回。一个示例消费者声明将是:
val (a, b) = forkAll[ClassA, ClassB](loadModelRdd)
即此时我想分叉加入并等待结果,但我希望并行执行执行,然后收集回驱动程序(具体来说是 Spark 驱动程序)。
问题是我不确定在构造 Future {} 块时如何强制 forkAll 中的工作单元返回的类型。如果没有 forkAll,实现如下所示:
val resA = loadModelRdd[ClassA](sc)
val resB = loadModelRdd[ClassB](sc)
...
我正在考虑这样做有两个原因:
- 为与此模型匹配的任何工作单元抽象分叉连接的细节。
- 该代码的一个版本明确说明了工作单元是什么,它正在生产中工作,并负责将长时间运行的块的执行减少近一半。我有几个可以应用此模式的执行步骤
这在 Scala 的类型系统中是可能的吗?还是我应该从不同的角度看待这个问题?我已经尝试了几种实现(包括此处描述的一种),但我还没有找到适合我当前对问题的看法的一种
如果需要任何其他信息,请告诉我。
谢谢!