您的基本问题是如何将 anObservable[ByteBuffer]
转换为Observable[String]
,其中每个String
都是一行,对吗?
你可以使用方法bufferWithSelector(selector: Observable[S]): Observable[Seq[A]]
。此方法将缓冲 Observable,直到选择器 Observable 发出一个元素。
我用Int
s 做了一个小例子:
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
val source = Observable.range(0, 1000, 1)
.delayOnNext(100.milliseconds)
val selector = source.filter(_ % 10 == 0)
val buffered = source.bufferWithSelector(selector)
.map(_.foldLeft("")((s, i) => s + i.toString)) // This folds the Seq[Int] into a String for display purposes
buffered.foreach(println)
试试看!
当然,这有一个主要缺点:底层的 Observablesource
将被评估两次。您可以通过修改上面的示例来看到这一点:
// Start writing your ScalaFiddle code here
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
val source = Observable.range(0, 1000, 1)
.delayOnNext(100.milliseconds)
.map {x => println(x); x} // <------------------
val selector = source.filter(_ % 10 == 0)
val buffered = source.bufferWithSelector(selector)
.map(_.foldLeft("")((s, i) => s + i.toString))
buffered.foreach(println)
这将打印每个数字两次。
要解决此问题,您必须将source
Observable 转换为热 Observable:
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
val source = Observable.range(0, 1000, 1)
.delayOnNext(100.milliseconds)
.map {x => println(x); x}
.publish // <-----------------------------
// source is now a ConnectableObservable and will start emitting elements
// once you call source.connect()
val selector = source.filter(_ % 10 == 0)
val buffered = source.bufferWithSelector(selector)
.map(_.foldLeft("")((s, i) => s + i.toString))
buffered.foreach(println)
source.connect() // <---------------------------
试试看!
您唯一需要做的就是修改选择器以仅在遇到换行时才发出项目。
我建议将其拆分Observable[ByteBuffer]
为Observable[Byte]
第一个(使用flatMap
)以避免头痛。