1

给定一个这样的队列:

val queue: Queue[Int] = async.boundedQueue[Int](1000)

我想拉出这个队列并将其流式传输到下游接收器,以 UP 到 100 的块。

queue.dequeue.chunk(100).to(downstreamConsumer) 

有点工作,但如果我说 101 条消息,它不会清空队列。将留下 1 条消息,除非再推入 99 条消息。我想尽可能多地从队列中取出 100 条消息,以我的下游进程可以处理的速度尽可能快。

是否有现有的组合器可用?

4

2 回答 2

0

为此,您可能需要在从队列中出列时监控队列的大小。然后,如果大小达到 0,您将不再等待任何元素。事实上,您可以elastic根据队列的大小来实现批处理的大小。IE :

val q = async.unboundedQueue[String]

val deq:Process[Task,(String,Int)] = q.dequeue zip q.size
val elasticChunk: Process1[(String,Int), Vector[String]] = ???
val downstreamConsumer : Sink[Task,Vector[String]] = ???

deq.pipe(elasticChunk) to downstreamConsumer
于 2015-08-29T04:24:52.810 回答
0

实际上,我以与预期不同的方式解决了这个问题。

scalaz-stream 队列现在包含一个dequeueBatch方法,该方法允许将队列中的所有值(最多 N 或块)出列。

https://github.com/scalaz/scalaz-stream/issues/338

于 2015-08-31T01:51:55.540 回答