这是Akka Stream - Select Sink based on Element in Flow的后续文章。
假设我有多个要从中流式传输的 SQS 队列。我正在使用Alpakka的AWS SQS 连接器Source
来创建.
implicit val sqsClient: AmazonSQSAsync = ???
val queueUrls: List[String] = ???
val sources: List[Source[Message, NotUsed]] = queueUrls.map(url => SqsSource(url))
现在,我想combine
合并它们的来源。但是,Source.combine方法不支持将列表作为参数传递,而仅支持可变参数。
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ⇒ Graph[UniformFanInShape[T, U], NotUsed])
当然,我可以手指输入所有源参数。但是,如果我有 10 个源队列,参数会变得很长。
有没有办法从源列表中组合源?
[补充]
正如Ramon J Romero y Vigil所指出的,保持流“薄薄的一层”是一种更好的做法。然而,在这种特殊情况下,我使用 singlesqsClient
进行所有SqsSource
初始化。