4

我正在使用sttp客户端。我想将响应解释为除以行的字符串,例如Observable[String]

这里sttp流api:

import java.nio.ByteBuffer

import com.softwaremill.sttp._
import com.softwaremill.sttp.okhttp.monix.OkHttpMonixBackend
import monix.eval.Task
import monix.reactive.Observable

implicit val sttpBackend = OkHttpMonixBackend()

val res: Task[Response[Observable[ByteBuffer]]] = sttp
  .post(uri"someUri")
  .response(asStream[Observable[ByteBuffer]])
  .send()

那我怎么能得到Observable[String]

这里有一些想法:

1.有没有一种简单的split按行观察的方法?
2.或者也许我可以InputStream从响应中得到原始信息,所以我可以很容易地拆分它,但我找不到使用类似asStream[InputStream]
3的方法。或者也许只是使用 http 后端 witoutsttp层?

4

1 回答 1

2

您的基本问题是如何将 anObservable[ByteBuffer]转换为Observable[String],其中每个String都是一行,对吗?

你可以使用方法bufferWithSelector(selector: Observable[S]): Observable[Seq[A]]。此方法将缓冲 Observable,直到选择器 Observable 发出一个元素。

我用Ints 做了一个小例子:

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

val source = Observable.range(0, 1000, 1)
  .delayOnNext(100.milliseconds)

val selector = source.filter(_ % 10 == 0)

val buffered = source.bufferWithSelector(selector)
  .map(_.foldLeft("")((s, i) => s + i.toString)) // This folds the Seq[Int] into a String for display purposes

buffered.foreach(println)

试试看!


当然,这有一个主要缺点:底层的 Observablesource将被评估两次。您可以通过修改上面的示例来看到这一点:

// Start writing your ScalaFiddle code here

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

val source = Observable.range(0, 1000, 1)
  .delayOnNext(100.milliseconds)
  .map {x => println(x); x}  // <------------------

val selector = source.filter(_ % 10 == 0)

val buffered = source.bufferWithSelector(selector)
  .map(_.foldLeft("")((s, i) => s + i.toString))

buffered.foreach(println)

这将打印每个数字两次。


要解决此问题,您必须将sourceObservable 转换为热 Observable:

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

val source = Observable.range(0, 1000, 1)
  .delayOnNext(100.milliseconds)
  .map {x => println(x); x}
  .publish // <-----------------------------

// source is now a ConnectableObservable and will start emitting elements
// once you call source.connect()

val selector = source.filter(_ % 10 == 0)

val buffered = source.bufferWithSelector(selector)
  .map(_.foldLeft("")((s, i) => s + i.toString))

buffered.foreach(println)

source.connect() // <---------------------------

试试看!

您唯一需要做的就是修改选择器以仅在遇到换行时才发出项目。

我建议将其拆分Observable[ByteBuffer]Observable[Byte]第一个(使用flatMap)以避免头痛。

于 2019-02-05T12:20:37.147 回答