0

在以下代码中,我将 TCP 套接字转换为Observable[Array[Byte]]

import rx.lang.scala.Observable
import rx.lang.scala.schedulers.IOScheduler

val sock = new Socket
type Bytes = Array[Byte]

lazy val s: Observable[Bytes] = Obs.using[Bytes, Socket] {
  sock.connect(new InetSocketAddress("10.0.2.2", 9002), 1000)
  sock
}(
  socket => Observable.from[Bytes] {

    val incoming = socket.getInputStream
    val buffer = new Bytes(1024)

    Stream.continually {
      val read = incoming.read(buffer, 0, 1024)
      buffer.take(read)
    }.takeWhile(_.nonEmpty)

  },

  socket => {
    println("Socket disposed")
    socket.close
    s.retry // Does not work
  })
  .subscribeOn(IOScheduler.apply)

s.subscribe(bytes => println(new String(bytes, "UTF-8")), println)

与远程服务器的连接可能随时中断,在这种情况下,我想Observable尝试自动重新连接但s.retry不执行任何操作。我怎样才能做到这一点?也可以在当前“内部”完成Observable而不创建新的并重新订阅吗?

4

1 回答 1

1

您想在每个新订阅上建立一个新的套接字连接。这是最简单的(A)SyncOnSubscribe,从版本. 移植到 RxScala 0.26.5。如果你有这个 observable,你可以使用正常的错误控制方法,比如.retry.

像这样的东西:

val socketObservable: Observable[Byte] = Observable.create(SyncOnSubscribe.singleState(
  generator = () =>
    sock
      .connect(new InetSocketAddress("10.0.2.2", 9002), 1000)
      .getInputStream
)(next = is => Try(is.read()) match {
    case Success(-1) => Notification.OnCompleted()
    case Success(byte) => Notification.OnNext(byte)
    case Failure(e) => Notification.OnError(e)
  },
  onUnsubscribe = is => Try(is.close)
)

注意:这一次读取一个字节,效率不是很高。您可以使用 ASyncOnSubscribe 或让您的 observable 的每个事件都是一个字节数组来改进这一点。

注意:这是一个冷可观察对象,将为每个订阅者创建一个新套接字。例如,这将打开 2 个套接字:

socketObservable.foreach(b => System.out.print(b))
socketObservable.buffer(1024).foreach(kiloByte => System.out.println(kiloByte))

如果这不是你想要的,你可以把它变成一个热门的.share

于 2017-01-17T12:24:23.450 回答