0

我的旧代码支持使用 SqsStream 使用一个 SQS 队列。我必须更新它以支持给定队列 URL 列表的多个队列。

方法内容:

for {
  sqs <- Sqs.>.async // async client
  urls <- Sqs.>.queueUrls // List[String] of multiple queues
  _ <- {
    urls
      .map(url => {
        SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
          .mapMParUnordered(10)(handleMessage) // run "handleMessage" up to 10 times concurrently, ZStream[Env, Throwable, Unit]
          .runDrain // ZIO[Env, Throwable, Unit]
          .forever // ZIO[Env, Throwable, Nothing]
      })
} yield ()

但是编译器抱怨,因为它需要一个(ZIO,ZIO,ZIO),而我给了它一个(ZIO,ZIO,List)。我假设我必须将该列表中的所有效果减少为将handleMessage在所有队列中并行执行的单个效果,但我不确定语法,因为我没有使用 ZIO 的经验。

基本上到了这一点,

urls
      .map(url => {
        SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))

我的网址变成了 ZStream。我想我需要ZStream.flatMapPar使用这个元素和下一个元素来调用,依此类推,直到它们都被压平在一起。我该怎么做?

4

1 回答 1

4

runDrain将返回一个ZIO你可以用它开火和忘记的foreachPar_

for {
  sqs <- Sqs.>.async
  urls <- Sqs.>.queueUrls
  // Returns ZIO[R, E, Unit] and executes each effect in parallel while discarding the results
  _ <- ZIO.foreachPar_(urls) { url =>
        SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
          // Handles up to 10 messages at a time in parallel.
          .mapMParUnordered(10)(handleMessage)
          // The stream is already unbounded so no need to have `.forever`
          .runDrain
      }
} yield ()

我还要澄清SqsStream应该已经是无界的,所以你不应该使用forever,并且mapMParUnordered参数是指最大并发而不是处理的事件总数。

于 2020-02-11T04:02:45.500 回答