我想通过 db.stream(yourquery) 通过 scalaz-stream 流式传输从 3.0.0 查询返回的数据。
看起来 reactive-streams.org 使用了不同库实现的 API 和数据流模型。
在从 scalaz-stream 流程回流到 slick 发布者的背压的情况下,您如何做到这一点?
我想通过 db.stream(yourquery) 通过 scalaz-stream 流式传输从 3.0.0 查询返回的数据。
看起来 reactive-streams.org 使用了不同库实现的 API 和数据流模型。
在从 scalaz-stream 流程回流到 slick 发布者的背压的情况下,您如何做到这一点?
看看https://github.com/krasserm/streamz
Streamz 是 scalaz-stream 的资源组合库。它允许 Process 实例消费和生产:
I finally did answer my own question. If you are willing to use scalaz-streams queues to queue up streaming results.
def getData[T](publisher: slick.backend.DatabasePublisher[T],
queue: scalaz.stream.async.mutable.Queue[T], batchRequest: Int = 1): Task[scala.concurrent.Future[Long]] =
Task {
val p = scala.concurrent.Promise[Unit]()
var counter: Long = 0
val s = new org.reactivestreams.Subscriber[T] {
var sub: Subscription = _
def onSubscribe(s: Subscription): Unit = {
sub = s
sub.request(batchRequest)
}
def onComplete(): Unit = {
sub.cancel()
p.success(counter)
}
def onError(t: Throwable): Unit = p.failure(t)
def onNext(e: T): Unit = {
counter += 1
queue.enqueueOne(e).run
sub.request(batchRequest)
}
}
publisher.subscribe(s)
p.future
}
When you run this using run
you obtain a future that when finished, means the query finished streaming. You can compose on this future if you wanted your computation to wait for all the data to arrive. You could also add use an Await in the Task in getData
then compose your computation on the returned Task object if you need all the data to run before continuing. For what I do, I compose on the future's completion and shutdown the queue so that my scalaz-stream knows to terminate cleanly.
这是一个稍微不同的实现(与 user1763729 发布的那个),它返回一个进程:
def getData[T](publisher: DatabasePublisher[T], batchSize: Long = 1L): Process[Task, T] = {
val q = async.boundedQueue[T](10)
val subscribe = Task.delay {
publisher.subscribe(new Subscriber[T] {
@volatile var subscription: Subscription = _
override def onSubscribe(s: Subscription) {
subscription = s
subscription.request(batchSize)
}
override def onNext(next: T) = {
q.enqueueOne(next).attemptRun
subscription.request(batchSize)
}
override def onError(t: Throwable) = q.fail(t).attemptRun
override def onComplete() = q.close.attemptRun
})
}
Process.eval(subscribe).flatMap(_ => q.dequeue)
}