在这种特殊情况下,我建议使用 Monix 作为 F 的 FS2:
import cats.implicits._
import monix.eval._, monix.execution._
import fs2._
// use your own types here
type BucketName = String
type BucketRegion = String
type S3Object = String
// use your own implementations as well
val fetchS3Buckets: Task[List[BucketName]] = Task(???)
val bucketRegion: BucketName => Task[BucketRegion] = _ => Task(???)
val listObject: BucketName => Task[List[S3Object]] = _ => Task(???)
Stream.evalSeq(fetchS3Buckets)
.parEvalMap(10) { name =>
// checking region, filtering and listing on batches of 10
bucketRegion(name).flatMap {
case "my-region" => listObject(name)
case _ => Task.pure(List.empty)
}
}
.foldMonoid // combines List[S3Object] together
.compile.lastOrError // turns into Task with result
.map(list => println(s"Result: $list"))
.onErrorHandle { case error: Throwable => println(error) }
.runToFuture // or however you handle it
FS2 下面使用cats.effect.IO 或Monix Task,或者任何你想要的,只要它提供了Cats Effect 类型类。它构建了一个很好的功能性 DSL 来设计数据流,因此您可以在没有 Akka Streams 的情况下使用响应式流。
这里有一个小问题,我们一次打印所有结果,如果它们的数量超过内存可以处理的数量,这可能是个坏主意 - 我们可以批量打印(不确定这是否是你的是否想要)或进行过滤和打印单独的批次。
Stream.evalSeq(fetchS3Buckets)
.parEvalMap(10) { name =>
bucketRegion(name).map(name -> _)
}
.collect { case (name, "my-region") => name }
.parEvalMap(10) { name =>
listObject(name).map(list => println(s"Result: $list"))
}
.compile
.drain
虽然在纯 Monix 中这一切都不是不可能的,但 FS2 使此类操作更易于编写和维护,因此您应该能够更轻松地实现您的流程。