3

我使用其原生 Enumerator 构造在 Play 中成功设置了一个 websocket,调用了一些返回字符串的代码:

def operationStatusFeed = WebSocket.using[String] { implicit request =>
  val in = Iteratee.ignore[String]
  val out = Enumerator.repeatM {
   Promise.timeout(operation, 3 seconds)
  }
  (in, out)
}

现在我希望我的operation函数返回一个rx.lang.scala.Observable[String]而不是一个字符串,并且我想在它进入后立即输出任何字符串。如何将此 Observable 映射到 a play.api.libs.iteratee.Enumerator

4

2 回答 2

2

您可以使用 Bryan Gilbert 的隐式转换。这将工作得很好,但要小心使用Bryan Gilbert 转换的更新版本!Jeroen Kransen 的回答中从来没有取消订阅(这很糟糕!)。

  /*
   * Observable to Enumerator
   */
  implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
    // unicast create a channel where you can push data and returns an Enumerator
    Concurrent.unicast { channel =>
      val subscription = obs.subscribe(new ChannelObserver(channel))
      val onComplete = { () => subscription.unsubscribe }
      val onError = { (_: String, _: Input[T]) => subscription.unsubscribe }
      (onComplete, onError)
    }
  }

  class ChannelObserver[T](channel: Channel[T]) extends rx.lang.scala.Observer[T] {
    override def onNext(elem: T): Unit = channel.push(elem)
    override def onCompleted(): Unit = channel.end()
    override def onError(e: Throwable): Unit = channel.end(e)
  }

为了完整起见,这里是从 Enumerator 到 Observable 的转换:

  /*
   * Enumerator to Observable
   */
  implicit def enumerator2Observable[T](enum: Enumerator[T]): Observable[T] = {
    // creating the Observable that we return
    Observable({ observer: Observer[T] =>
      // keeping a way to unsubscribe from the observable
      var cancelled = false

      // enumerator input is tested with this predicate
      // once cancelled is set to true, the enumerator will stop producing data
      val cancellableEnum = enum through Enumeratee.breakE[T](_ => cancelled)

      // applying iteratee on producer, passing data to the observable
      cancellableEnum (
        Iteratee.foreach(observer.onNext(_))
      ).onComplete { // passing completion or error to the observable
        case Success(_) => observer.onCompleted()
        case Failure(e) => observer.onError(e)
      }

      // unsubscription will change the var to stop the enumerator above via the breakE function
      new Subscription { override def unsubscribe() = { cancelled = true } }
    })
  }

用于 WebSockets 的 Rx

另一方面,您可能会注意到,您在 Play 中处理迭代器和枚举器的大部分时间是在使用 WebSockets 时(就像您在此处所做的那样)。我们都同意 Iteratees 确实不如 Observables 直观,这可能就是您在 Play 项目中使用 Rx 的原因。

根据这个观察,我构建了一个名为WidgetManager的库,它正是这样做的:在 Play 中集成 Rx 以摆脱 Iteratees 操作。

使用该库,您的代码可能只是:

def operationStatusFeed = WebSocket.using[String] { implicit request =>

  // you can optionally give a function to process data from the client (processClientData)
  // and a function to execute when connection is closed (onClientClose)
  val w = new WidgetManager()

  w.addObservable("op", operation)

  // subscribe to it and push data in the socket to the client (automatic JS callback called)
  w.subscribePush("op")

  // deals with Iteratees and Enumerators for you and returns what's needed
  w.webSocket
}

该库在 GitHub 上:RxPlay(欢迎贡献)

于 2014-06-28T11:28:16.333 回答
0

我在Brian Gilbert的启发下制作了这个解决方案:

class ChannelObserver[T](chan: Channel[T]) extends Observer[T] {
  override def onNext(arg: T): Unit = chan.push(arg)
  override def onCompleted(): Unit = chan.end()
  override def onError(e: Throwable): Unit = chan.end(e)
  override val asJavaObserver: rx.Observer[T] = new rx.Observer[T] {
    def onCompleted() {
      chan.end()
    }

    def onError(e: Throwable) {
      chan.end(e)
    }

    def onNext(arg: T) {
      chan.push(arg)
    }
  }
}

implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
  Concurrent.unicast[T](onStart = { chan =>
      obs.subscribe(new ChannelObserver(chan))
  })
}

隐式函数无需任何额外代码即可将 Observables 转换为 Enumerators。

于 2014-06-26T14:38:50.937 回答