我正在修补 Akka,需要一些建议来实现我想到的特定内容。我想要一个演员,我可以发送DownloadFile(URI, File)
消息并下载它。由于这可以并行,我不想一个接一个地下载文件,但有并发下载的限制。
用 Akka 建模这样的东西的预期方法是什么?想到的其他事情是:如果“工人”演员之一因某种原因死亡会发生什么?如何重试下载?等等等等。
我知道这是一个非常大的问题,但我希望有人花时间回答它!谢谢!
我正在修补 Akka,需要一些建议来实现我想到的特定内容。我想要一个演员,我可以发送DownloadFile(URI, File)
消息并下载它。由于这可以并行,我不想一个接一个地下载文件,但有并发下载的限制。
用 Akka 建模这样的东西的预期方法是什么?想到的其他事情是:如果“工人”演员之一因某种原因死亡会发生什么?如何重试下载?等等等等。
我知道这是一个非常大的问题,但我希望有人花时间回答它!谢谢!
试一试;它创建三个 - 但您可以将其配置为创建任意数量的下载器,以便可以同时处理三个下载请求。
sealed trait DownloaderMessage
case class DownloadFile(uri: URI, file: File) extends DownloaderMessage
object Downloader {
val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool").build
}
class Downloader extends Actor {
self.lifeCycle = Permanent
self.dispatcher = Downloader.dispatcher
def receive = {
case DownloadFile(uri, file) =>
// do the download
}
}
trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
val downloaders: List[ActorRef]
val seq = new CyclicIterator[ActorRef](downloaders)
}
trait DownloadManager extends Actor {
self.lifeCycle = Permanent
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 5000)
val downloaders: List[ActorRef]
override def preStart = downloaders foreach { self.startLink(_) }
override def postStop = self.shutdownLinkedActors()
}
class DownloadService extends DownloadManager with CyclicLoadBalancing {
val downloaders = List.fill(3)(Actor.actorOf[Downloader])
}
创建一个管理下载的 DownloadActor 类,让所有 DownloadActor 共享同一个 Dispatcher,根据您的需要配置 Dispatcher(最大线程数、队列大小等),让所有 DownloadActor 链接到同一个 Supervisor,根据您的需要配置 Supervisor (可能是 OneForOneStrategy),为每个新的 Download 创建一个新的 DownloadActor,或者使用带有适当 InfiniteIterator 的 LoadBalancer 将下载分发到 DownloadActor。
如果您使用 AsycHttpClient 下载文件,它支持下载-恢复。