0

我的应用程序要求我有多个线程运行从各种 HDFS 节点获取数据。为此,我正在使用线程执行器池和分叉线程。 分叉在:

val pathSuffixList = fileStatuses.getOrElse("FileStatus", List[Any]()).asInstanceOf[List[Map[String, Any]]]
  pathSuffixList.foreach(block => {
    ConsumptionExecutor.execute(new Consumption(webHdfsUri,block))
  })

我的班级消费:

class Consumption(webHdfsUri: String, block:Map[String,Any]) extends Runnable {

      override def run(): Unit = {
        val uriSplit = webHdfsUri.split("\\?")
        val fileOpenUri = uriSplit(0) + "/" + block.getOrElse("pathSuffix", "").toString + "?op=OPEN"
        val inputStream = new URL(fileOpenUri).openStream()
        val datumReader = new GenericDatumReader[Void]()
        val dataStreamReader = new DataFileStream(inputStream, datumReader)
        //        val schema = dataStreamReader.getSchema()
        val dataIterator = dataStreamReader.iterator()
        while (dataIterator.hasNext) {
          println(" data : " + dataStreamReader.next())
        }
      }

    }

消费执行者:

object ConsumptionExecutor{

  val counter: AtomicLong = new AtomicLong()

  val executionContext: ExecutorService = Executors.newCachedThreadPool(new ThreadFactory {
    def newThread(r: Runnable): Thread = {
      val thread: Thread = new Thread(r)
      thread.setName("ConsumptionExecutor-" + counter.incrementAndGet())
      thread
    }
  })
  executionContext.asInstanceOf[ThreadPoolExecutor].setMaximumPoolSize(200)

  def execute(trigger: Runnable) {
    executionContext.execute(trigger)
  }

}

但是,我想在不需要提供固定线程池大小的地方使用 Akka 流/Akka 演员,而 Akka 会处理所有事情。我对 Akka 以及 Streaming 和 actor 的概念还很陌生。有人可以以示例代码的形式给我任何线索以适合我的用例吗?提前致谢!

4

1 回答 1

1

一个想法是为您正在读取的每个 HDFS 节点创建一个ActorPublisher的(子类)实例,然后将它们作为FlowGraphMerge中的多个Sources 。

像这样的伪代码,其中ActorPublisher省略了源的详细信息:

val g = PartialFlowGraph { implicit b =>
  import FlowGraphImplicits._
  val in1 = actorSource1
  val in2 = actorSource2
  // etc.

  val out = UndefinedSink[T]
  val merge = Merge[T]

  in1 ~> merge ~> out
  in2 ~> merge
  // etc.
}

这可以通过迭代它们并merge为每个actor源添加一个边缘来改进一组actor源,但这给出了这个想法。

于 2015-02-08T15:06:51.907 回答