0

我正在尝试学习 Scala 并从中获得一些乐趣,但我遇到了这个经典问题。它让我想起了 NodeJS 早期的很多嵌套回调地狱。

这是我的伪代码程序:

  1. 获取 S3 存储桶列表的任务。
  2. 任务一完成后,我想以十个为一组批量处理桶。
  3. 对于每批:
  4. 获取每个存储桶的区域。
  5. 过滤掉不在区域内的桶。
  6. 列出每个存储桶中的所有对象。
  7. 打印一切

在某一时刻,我最终选择了以下类型:Task[Iterator[Task[List[Bucket]]]]

本质上:

外部任务是列出所有 S3 存储桶的初始步骤,然后内部迭代器/任务/列表尝试批处理返回列表的任务。

我希望有某种方法可以删除/展平外部任务以到达Iterator[Task[List[Bucket]]].

当我尝试将我的处理分解为多个步骤时,深度嵌套会导致我做许多嵌套映射。这是正确的做法还是有更好的方法来处理这种嵌套?

4

1 回答 1

2

在这种特殊情况下,我建议使用 Monix 作为 F 的 FS2:

import cats.implicits._
import monix.eval._, monix.execution._
import fs2._

// use your own types here
type BucketName = String
type BucketRegion = String
type S3Object = String

// use your own implementations as well
val fetchS3Buckets: Task[List[BucketName]] = Task(???)
val bucketRegion: BucketName => Task[BucketRegion] = _ => Task(???)
val listObject: BucketName => Task[List[S3Object]] = _ => Task(???)

Stream.evalSeq(fetchS3Buckets)
  .parEvalMap(10) { name =>
    // checking region, filtering and listing on batches of 10
    bucketRegion(name).flatMap {
      case "my-region" => listObject(name)
      case _           => Task.pure(List.empty)
    }
  }
  .foldMonoid // combines List[S3Object] together
  .compile.lastOrError // turns into Task with result
  .map(list => println(s"Result: $list"))
  .onErrorHandle { case error: Throwable => println(error) }
  .runToFuture // or however you handle it

FS2 下面使用cats.effect.IO 或Monix Task,或者任何你想要的,只要它提供了Cats Effect 类型类。它构建了一个很好的功能性 DSL 来设计数据流,因此您可以在没有 Akka Streams 的情况下使用响应式流。

这里有一个小问题,我们一次打印所有结果,如果它们的数量超过内存可以处理的数量,这可能是个坏主意 - 我们可以批量打印(不确定这是否是你的是否想要)或进行过滤和打印单独的批次。

Stream.evalSeq(fetchS3Buckets)
  .parEvalMap(10) { name =>
    bucketRegion(name).map(name -> _)
  }
  .collect { case (name, "my-region") => name }
  .parEvalMap(10) { name =>
    listObject(name).map(list => println(s"Result: $list"))
  }
  .compile
  .drain

虽然在纯 Monix 中这一切都不是不可能的,但 FS2 使此类操作更易于编写和维护,因此您应该能够更轻松地实现您的流程。

于 2020-05-16T18:51:06.310 回答