我建议你看看ScalaQuery,它做同样的事情。它可以这样做,因为这是一个单子问题。实际上,由Scalaz 库实现的一些 Haskell 解决方案(例如 Arrows )似乎非常接近。
那将是最好的解决方案,因为适当的抽象将使将来的更改更容易。
作为一个黑客,我认为是这样的:
abstract class QueryModifiers
case object Consolidate extends QueryModifiers
// create others as appropriate
class Query(title: String) {
self =>
// Create actors
def createActor(qm: QueryModifiers): Actor = {
val actor = qm match {
case Consolidate => // create a consolidator actor
case //... as needed
}
actor.start
actor
}
// The pipeline
val pipe: List[List[QueryModifiers]] = Nil
// Build the pipeline
def ->(qms: List[QueryModifiers]) = new Query(title) {
override val pipe = qms :: self.pipe
}
def ->(qm: QueryModifiers) = new Query(title) {
override val pipe = List(qm) :: self.pipe
}
def ->(c: Consolidate.type) = {
// Define the full pipeline
// Because the way pipe is built, the last layer comes first, and the first comes last
val pipeline = Consolidate :: pipe
// Create an actor for every QueryModifier, using an unspecified createActor function
val actors = pipeline map (_ map (createActor(_))
// We have a list of lists of actors now, where the first element of the list
// was the last QueryModifiers we received; so, group the layers by two, and for each
// pair, make the second element send the result to the first.
// Since each layer can contain many actors, make each member of the second
// layer send the results to each member of the first layer.
// The actors should be expecting to receive message SendResultsTo at any time.
for {
List(nextLayer, previousLayer) <- actors.iterator sliding 2
nextActor <- nextLayer
previousActor <- previousLayer
} previousActor ! SendResultsTo(nextActor)
// Send the query to the first layer
for ( firstActor <- actors.last ) firstActor ! Query(title)
// Get the result from the last layer, which is the consolidator
val results = actors.head.head !? Results
// Return the results
results
}
}
编辑
您也可以通过一些技巧来保证订购。我在这里尝试避免使用 Scala 2.8,尽管使用命名参数和默认参数可以使这变得更容易。
sealed abstract class QueryModifiers
case class QMSearcher(/*...*/) extends QueryModifiers
case class QMFilter(/*...*/) extends QueryModifiers
case class QMFetcher(/*...*/) extends QueryModifiers
case object Consolidate extends QueryModifiers
class Query[NextQM] private (title: String, searchers: List[QMSeacher], filters: List[QMFilter], fetchers: List[QMFetcher]) {
// Build the pipeline
def ->[T <: NextQM](qms: List[NextQM])(implicit m: Manifest[T]) = m.toString match {
case "QMSearch" => new Query[QMFilter](title, qms, Nil, Nil)
case "QMFilter" => new Query[QMFetcher](title, seachers, qms, Nil)
case "QMFetcher" => new Query[Consolidate.type](title, searches, filters, qms)
case _ /* "Consolidate$", actually */ => error("List of consolidate unexpected")
}
// Do similarly for qm: NextQM
// Consolidation
def ->(qm: Consolidate.type) = {
// Create Searchers actors
// Send them the Filters
// Send them Fetchers
// Create the Consolidator actor
// Send it to Searchers actors
// Send Searchers the query
// Ask Consolidator for answer
}
}
object Query {
def apply(title: String) = new Query[QMSearcher](title, Nil, Nil, Nil)
}
现在,Searchers 参与者保留了一个过滤器列表、一个提取器列表以及对合并器的引用。他们收听消息,通知他们这些事情,并进行查询。对于每个结果,他们为列表中的每个过滤器创建一个过滤器参与者,向每个过滤器发送获取器和合并器的列表,然后将结果发送给它们。
过滤器参与者保留一个提取器列表和对合并器的引用。他们收听通知他们这些事情的消息,以及搜索者的结果。他们将他们的输出(如果有的话)发送给新创建的 fetcher actor,他们首先被告知合并者。
提取器保留对合并器的引用。他们收听一条消息,通知他们该引用,以及过滤器的结果。他们将结果依次发送给整合商。
合并者收听两条消息。来自 fetcher actor 的一条消息通知他们结果,这些结果是他们积累的。来自查询的另一条消息要求该结果,它返回。
剩下的唯一事情就是设计一种方法让合并者知道所有结果都已处理。一种方法如下:
- 在 Query 中,通知 Consolidator actor 所创建的每个 Searcher。合并者保留一份清单,并带有一个标志,表明它们是否已完成。
- 每个搜索者都保留一个它创建的过滤器列表,并等待来自它们的“完成”消息。当搜索者没有任何处理要做并且从所有过滤器接收到“完成”时,它会向整合者发送一条消息,通知它已经完成。
- 反过来,每个过滤器都会保留它创建的提取器列表,并且同样等待来自它们的“完成”消息。当它完成处理并从所有提取器收到“完成”时,它会通知搜索器它已经完成。
- 它的提取器在其工作完成时向创建它的过滤器发送“完成”消息并发送到合并器。
- 合并器仅在收到所有搜索者的“完成”后才收听查询结果的消息。