在以下代码中,我将 TCP 套接字转换为Observable[Array[Byte]]
:
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.IOScheduler
val sock = new Socket
type Bytes = Array[Byte]
lazy val s: Observable[Bytes] = Obs.using[Bytes, Socket] {
sock.connect(new InetSocketAddress("10.0.2.2", 9002), 1000)
sock
}(
socket => Observable.from[Bytes] {
val incoming = socket.getInputStream
val buffer = new Bytes(1024)
Stream.continually {
val read = incoming.read(buffer, 0, 1024)
buffer.take(read)
}.takeWhile(_.nonEmpty)
},
socket => {
println("Socket disposed")
socket.close
s.retry // Does not work
})
.subscribeOn(IOScheduler.apply)
s.subscribe(bytes => println(new String(bytes, "UTF-8")), println)
与远程服务器的连接可能随时中断,在这种情况下,我想Observable
尝试自动重新连接但s.retry
不执行任何操作。我怎样才能做到这一点?也可以在当前“内部”完成Observable
而不创建新的并重新订阅吗?