1

在我的 Android 应用程序中,我需要使用 aSocket来发送和接收字节数组。为方便起见,我想使用Observable连接到Socket.

在互联网上我发现了这个代码:

import rx.lang.scala.Observable

val s = Observable.using[Char,Socket](new Socket("10.0.2.2", 9002))(
  socket => Observable.from[Char](Source.fromInputStream(socket.getInputStream).toIterable),
  socket => Try(socket.close))
  .subscribeOn(rx.lang.scala.schedulers.IOScheduler.apply)

  val a = s.subscribe(println, println)

它可以工作,但一次输出一个字符,例如当发送一个“hello there”字符串时,输出是:

I/System.out: h
I/System.out: e
I/System.out: l
I/System.out: l
I/System.out: o
I/System.out:  
I/System.out: t
I/System.out: h
I/System.out: e
I/System.out: r
I/System.out: e

但我想在订阅中接收缓冲的字节数组。我怎样才能做到这一点?

4

1 回答 1

1

正如@SeanVieira 已经说过的,您首先必须决定如何聚合流元素,即字符。如果您知道流将在每条消息后关闭,则可以等待接收到整个消息,然后在onCompleted().

我认为到目前为止您实施的内容非常好,因为它取决于观察者想要处理角色的内容和方式。

然后,您可以根据需要添加流转换,例如

在您已经创建的 Observable 上使用的解决方案tumblingBuffer可能如下所示(未经测试):

 source.tumblingBuffer(source.filter(_ == '\n'))

source.filter(...)一旦边界可观察发射一个元素,你缓冲从源传入的任何内容并发射整个缓冲区。mkString然后,您可以使用并订阅该 Observable将字符序列转换为字符串:

source.tumblingBuffer(source.filter(_ == '\n')).map(mkString(_))
于 2017-02-06T10:22:33.277 回答