假设我需要并行处理给定文件夹中的文件。在 Java 中,我会创建一个FolderReader
线程来从文件夹和FileProcessor
线程池中读取文件名。FolderReader
读取文件名并将文件处理函数 ( Runnable
) 提交给池执行器。
在 Scala 中,我看到两个选项:
- 创建一个
FileProcessor
参与者池并使用Actors.Scheduler
. - 在读取文件名时为每个文件名创建一个演员。
是否有意义?什么是最好的选择?
根据你在做什么,它可能很简单
for(file<-files.par){
//process the file
}
我建议尽我所能远离线程。幸运的是,我们有更好的抽象来处理下面发生的事情,在你的情况下,在我看来,你不需要使用演员(虽然你可以),但你可以使用更简单的抽象,称为 Futures。它们是 Akka 开源库的一部分,我认为未来也将成为 Scala 标准库的一部分。
Future[T] 只是将来会返回 T 的东西。
运行未来所需的只是拥有一个隐式 ExecutionContext,您可以从 java 执行器服务派生它。然后您将能够享受优雅的 API 以及 future 是一个将集合转换为期货集合、收集结果等的单子这一事实。我建议你看看http://doc.akka.io/docs/akka/2.0.1/scala/futures.html
object TestingFutures {
implicit val executorService = Executors.newFixedThreadPool(20)
implicit val executorContext = ExecutionContext.fromExecutorService(executorService)
def testFutures(myList:List[String]):List[String]= {
val listOfFutures : Future[List[String]] = Future.traverse(myList){
aString => Future{
aString.reverse
}
}
val result:List[String] = Await.result(listOfFutures,1 minute)
result
}
}
这里发生了很多事情:
Future.traverse
which 接收作为第一个参数 which is M[T]<:Traversable[T]
和作为第二个参数 aT => Future[T]
或者如果您更喜欢 aFunction1[T,Future[T]]
并返回 Future[M[T]]Future.apply
方法创建一个匿名类型的类Future[T]
看 Akka 期货还有很多其他原因。
Futures 可以被映射,因为它们是 monad,即你可以链接 Futures 执行:
Future { 3 }.map { _ * 2 }.map { _.toString }
Futures 有回调:future.onComplete、onSuccess、onFailure 和Then 等。
Futures 不仅支持遍历,还支持理解
理想情况下,您应该使用两个演员。一种用于读取文件列表,另一种用于实际读取文件。
您只需向第一个参与者发送一条“开始”消息即可开始该过程。然后actor可以读取文件列表,并向第二个actor发送消息。然后第二个参与者读取文件并处理内容。
拥有多个参与者,这可能看起来很复杂,实际上是一件好事,因为您有一堆对象相互通信,就像在理论上的 OO 系统中一样。
编辑:您真的不应该同时读取单个文件。
我打算准确地写出@Edmondo1984 做了什么,但他打败了我。:) 我非常赞同他的建议。我还建议您阅读Akka 2.0.2的文档。同样,我会给你一个更具体的例子:
import akka.dispatch.{ExecutionContext, Future, Await}
import akka.util.duration._
import java.util.concurrent.Executors
import java.io.File
val execService = Executors.newCachedThreadPool()
implicit val execContext = ExecutionContext.fromExecutorService(execService)
val tmp = new File("/tmp/")
val files = tmp.listFiles()
val workers = files.map { f =>
Future {
f.getAbsolutePath()
}
}.toSeq
val result = Future.sequence(workers)
result.onSuccess {
case filenames =>
filenames.foreach { fn =>
println(fn)
}
}
// Artificial just to make things work for the example
Thread.sleep(100)
execContext.shutdown()
在这里,我使用sequence
而不是traverse
,但区别将取决于您的需求。
与未来同行,我的朋友;在这种情况下,Actor 只是一种更痛苦的方法。
但是如果使用演员,那有什么问题呢?
如果我们必须读/写一些属性文件。有我的 Java 示例。但仍然与 Akka Actors 合作。
以免我们有一个演员ActorFile
代表一个文件。嗯..可能它不能代表一个文件。对?(会很好)。那么它代表了几个文件,如下所示PropertyFilesActor
:
为什么不使用这样的东西:
public class PropertyFilesActor extends UntypedActor {
Map<String, String> filesContent = new LinkedHashMap<String, String>();
{ // here we should use real files of cource
filesContent.put("file1.xml", "");
filesContent.put("file2.xml", "");
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof WriteMessage) {
WriteMessage writeMessage = (WriteMessage) message;
String content = filesContent.get(writeMessage.fileName);
String newContent = content + writeMessage.stringToWrite;
filesContent.put(writeMessage.fileName, newContent);
}
else if (message instanceof ReadMessage) {
ReadMessage readMessage = (ReadMessage) message;
String currentContent = filesContent.get(readMessage.fileName);
// Send the current content back to the sender
getSender().tell(new ReadMessage(readMessage.fileName, currentContent), getSelf());
}
else unhandled(message);
}
}
...一条消息将带有参数(文件名)
它有自己的in-box
,接受如下消息:
那些消息将被存储到in-box
一个接一个的顺序中。演员将通过从盒子接收消息来完成其工作 - 存储/阅读,同时发送反馈sender ! message
。
因此,假设我们写入属性文件,并发送显示网页上的内容。我们可以开始显示页面(在我们发送消息以将数据存储到文件之后),一旦我们收到反馈,就使用刚刚更新的文件中的数据(通过 ajax)更新页面的一部分。
好吧,抓住你的文件并将它们放在一个平行的结构中
scala> new java.io.File("/tmp").listFiles.par
res0: scala.collection.parallel.mutable.ParArray[java.io.File] = ParArray( ... )
然后...
scala> res0 map (_.length)
res1: scala.collection.parallel.mutable.ParArray[Long] = ParArray(4943, 1960, 4208, 103266, 363 ... )