6

假设我需要并行处理给定文件夹中的文件。在 Java 中,我会创建一个FolderReader线程来从文件夹和FileProcessor线程池中读取文件名。FolderReader读取文件名并将文件处理函数 ( Runnable) 提交给池执行器。

在 Scala 中,我看到两个选项:

  • 创建一个FileProcessor参与者池并使用Actors.Scheduler.
  • 在读取文件名时为每个文件名创建一个演员。

是否有意义?什么是最好的选择?

4

6 回答 6

10

根据你在做什么,它可能很简单

for(file<-files.par){
   //process the file
}
于 2012-07-20T12:36:55.983 回答
3

我建议尽我所能远离线程。幸运的是,我们有更好的抽象来处理下面发生的事情,在你的情况下,在我看来,你不需要使用演员(虽然你可以),但你可以使用更简单的抽象,称为 Futures。它们是 Akka 开源库的一部分,我认为未来也将成为 Scala 标准库的一部分。

Future[T] 只是将来会返回 T 的东西。

运行未来所需的只是拥有一个隐式 ExecutionContext,您可以从 java 执行器服务派生它。然后您将能够享受优雅的 API 以及 future 是一个将集合转换为期货集合、收集结果等的单子这一事实。我建议你看看http://doc.akka.io/docs/akka/2.0.1/scala/futures.html

object TestingFutures {
  implicit val executorService = Executors.newFixedThreadPool(20)
  implicit val executorContext = ExecutionContext.fromExecutorService(executorService)

  def testFutures(myList:List[String]):List[String]= {

    val listOfFutures : Future[List[String]] = Future.traverse(myList){
      aString => Future{
                        aString.reverse
                       }
     }
    val result:List[String] = Await.result(listOfFutures,1 minute)
    result

  }
}

这里发生了很多事情:

  • 我使用Future.traversewhich 接收作为第一个参数 which is M[T]<:Traversable[T]和作为第二个参数 aT => Future[T]或者如果您更喜欢 aFunction1[T,Future[T]]并返回 Future[M[T]]
  • 我正在使用该Future.apply方法创建一个匿名类型的类Future[T]

看 Akka 期货还有很多其他原因。

  • Futures 可以被映射,因为它们是 monad,即你可以链接 Futures 执行:

    Future { 3 }.map { _ * 2 }.map { _.toString }

  • Futures 有回调:future.onComplete、onSuccess、onFailure 和Then 等。

  • Futures 不仅支持遍历,还支持理解

于 2012-07-20T09:49:34.530 回答
2

理想情况下,您应该使用两个演员。一种用于读取文件列表,另一种用于实际读取文件。

您只需向第一个参与者发送一条“开始”消息即可开始该过程。然后actor可以读取文件列表,并向第二个actor发送消息。然后第二个参与者读取文件并处理内容。

拥有多个参与者,这可能看起来很复杂,实际上是一件好事,因为您有一堆对象相互通信,就像在理论上的 OO 系统中一样。

编辑:您真的不应该同时读取单个文件。

于 2012-07-20T09:37:55.863 回答
2

我打算准确地写出@Edmondo1984 做了什么,但他打败了我。:) 我非常赞同他的建议。我还建议您阅读Akka 2.0.2的文档。同样,我会给你一个更具体的例子:

import akka.dispatch.{ExecutionContext, Future, Await}
import akka.util.duration._
import java.util.concurrent.Executors
import java.io.File

val execService = Executors.newCachedThreadPool()
implicit val execContext = ExecutionContext.fromExecutorService(execService)

val tmp = new File("/tmp/")
val files = tmp.listFiles()
val workers = files.map { f =>
  Future {
    f.getAbsolutePath()
  }
}.toSeq
val result = Future.sequence(workers)
result.onSuccess {
  case filenames =>
    filenames.foreach { fn =>
      println(fn)
    }
}

// Artificial just to make things work for the example
Thread.sleep(100)
execContext.shutdown()

在这里,我使用sequence而不是traverse,但区别将取决于您的需求。

与未来同行,我的朋友;在这种情况下,Actor 只是一种更痛苦的方法。


于 2012-07-20T10:00:50.943 回答
2

但是如果使用演员,那有什么问题呢?

如果我们必须读/写一些属性文件。有我的 Java 示例。但仍然与 Akka Actors 合作。

以免我们有一个演员ActorFile代表一个文件。嗯..可能它不能代表一个文件。对?(会很好)。那么它代表了几个文件,如下所示PropertyFilesActor

为什么不使用这样的东西:

public class PropertyFilesActor extends UntypedActor {

    Map<String, String> filesContent = new LinkedHashMap<String, String>();

    { // here we should use real files of cource
        filesContent.put("file1.xml", "");
        filesContent.put("file2.xml", "");
    }

    @Override
    public void onReceive(Object message) throws Exception {

        if (message instanceof WriteMessage)  {
            WriteMessage writeMessage = (WriteMessage) message;
            String content = filesContent.get(writeMessage.fileName);
            String newContent = content + writeMessage.stringToWrite;
            filesContent.put(writeMessage.fileName, newContent);
        }

        else if (message instanceof ReadMessage) {
            ReadMessage readMessage = (ReadMessage) message;
            String currentContent = filesContent.get(readMessage.fileName);
            // Send the current content back to the sender
            getSender().tell(new ReadMessage(readMessage.fileName, currentContent), getSelf());
        }

        else unhandled(message);

    }

}

...一条消息将带有参数(文件名)

它有自己的in-box,接受如下消息:

  1. 写行(文件名,字符串)
  2. 读取行(文件名,字符串)

那些消息将被存储到in-box一个接一个的顺序中。演员将通过从盒子接收消息来完成其工作 - 存储/阅读,同时发送反馈sender ! message

因此,假设我们写入属性文件,并发送显示网页上的内容。我们可以开始显示页面(在我们发送消息以将数据存储到文件之后),一旦我们收到反馈,就使用刚刚更新的文件中的数据(通过 ajax)更新页面的一部分。

于 2013-05-15T20:12:04.033 回答
1

好吧,抓住你的文件并将它们放在一个平行的结构中

scala> new java.io.File("/tmp").listFiles.par
res0: scala.collection.parallel.mutable.ParArray[java.io.File] = ParArray( ... )

然后...

scala> res0 map (_.length)
res1: scala.collection.parallel.mutable.ParArray[Long] = ParArray(4943, 1960, 4208, 103266, 363 ... )
于 2012-07-20T16:12:28.357 回答