4

我想通过 db.stream(yourquery) 通过 scalaz-stream 流式传输从 3.0.0 查询返回的数据。

看起来 reactive-streams.org 使用了不同库实现的 API 和数据流模型。

在从 scalaz-stream 流程回流到 slick 发布者的背压的情况下,您如何做到这一点?

4

3 回答 3

3

看看https://github.com/krasserm/streamz

Streamz 是 scalaz-stream 的资源组合库。它允许 Process 实例消费和生产:

  • Apache Camel 端点
  • Akka Persistence 日志和快照存储和
  • 具有完整背压支持的 Akka Stream 流(反应流)
于 2016-01-14T19:59:34.907 回答
2

I finally did answer my own question. If you are willing to use scalaz-streams queues to queue up streaming results.

def getData[T](publisher: slick.backend.DatabasePublisher[T],
  queue: scalaz.stream.async.mutable.Queue[T], batchRequest: Int = 1): Task[scala.concurrent.Future[Long]] =
  Task {
    val p = scala.concurrent.Promise[Unit]()
    var counter: Long = 0
    val s = new org.reactivestreams.Subscriber[T] {
      var sub: Subscription = _

      def onSubscribe(s: Subscription): Unit = {
        sub = s
        sub.request(batchRequest)
      }

      def onComplete(): Unit = {
        sub.cancel()
        p.success(counter)
      }

      def onError(t: Throwable): Unit = p.failure(t)

      def onNext(e: T): Unit = {
        counter += 1
        queue.enqueueOne(e).run
        sub.request(batchRequest)
      }
    }
    publisher.subscribe(s)
    p.future
  }

When you run this using run you obtain a future that when finished, means the query finished streaming. You can compose on this future if you wanted your computation to wait for all the data to arrive. You could also add use an Await in the Task in getData then compose your computation on the returned Task object if you need all the data to run before continuing. For what I do, I compose on the future's completion and shutdown the queue so that my scalaz-stream knows to terminate cleanly.

于 2015-03-21T13:36:23.100 回答
0

这是一个稍微不同的实现(与 user1763729 发布的那个),它返回一个进程:

def getData[T](publisher: DatabasePublisher[T], batchSize: Long = 1L): Process[Task, T] = {
 val q = async.boundedQueue[T](10)

 val subscribe = Task.delay {
  publisher.subscribe(new Subscriber[T] {

    @volatile var subscription: Subscription = _

    override def onSubscribe(s: Subscription) {
      subscription = s
      subscription.request(batchSize)
    }

    override def onNext(next: T) = {
        q.enqueueOne(next).attemptRun
        subscription.request(batchSize)
    }

    override def onError(t: Throwable) = q.fail(t).attemptRun

    override def onComplete() = q.close.attemptRun
  })
 }

 Process.eval(subscribe).flatMap(_ => q.dequeue)
}
于 2016-04-27T14:26:22.457 回答