我的旧代码支持使用 SqsStream 使用一个 SQS 队列。我必须更新它以支持给定队列 URL 列表的多个队列。
方法内容:
for {
sqs <- Sqs.>.async // async client
urls <- Sqs.>.queueUrls // List[String] of multiple queues
_ <- {
urls
.map(url => {
SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
.mapMParUnordered(10)(handleMessage) // run "handleMessage" up to 10 times concurrently, ZStream[Env, Throwable, Unit]
.runDrain // ZIO[Env, Throwable, Unit]
.forever // ZIO[Env, Throwable, Nothing]
})
} yield ()
但是编译器抱怨,因为它需要一个(ZIO,ZIO,ZIO),而我给了它一个(ZIO,ZIO,List)。我假设我必须将该列表中的所有效果减少为将handleMessage
在所有队列中并行执行的单个效果,但我不确定语法,因为我没有使用 ZIO 的经验。
基本上到了这一点,
urls
.map(url => {
SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
我的网址变成了 ZStream。我想我需要ZStream.flatMapPar
使用这个元素和下一个元素来调用,依此类推,直到它们都被压平在一起。我该怎么做?