4

我正在尝试序列化订阅以通过网络发送。我正在使用 Scala,并执行以下操作:

observable.materialize.subscribe{ n : Notification => sendToNetwork(n)}

但是,我收到错误:

java.io.NotSerializableException: rx.lang.scala.Notification$OnNext

(准确地说,我正在使用 Akka 并尝试将通知发送到远程参与者。但我认为这个问题比这更普遍)。

似乎它拒绝序列化OnNext该类,该类实际上是 的子类,它是伴生对象Notification的内部类:rx.lang.scala.Notification

http://rxscala.github.io/scaladoc/#rx.lang.scala.Notification $$OnNext

...而且我想我在 java 文档中的某处看到无法序列化内部非静态类。

我对此的理解正确吗?如果是这样,这是 rx-java 的类层次结构的限制吗?或者有什么方法可以解决这个问题,并序列化Notifications?

4

2 回答 2

3

我对此的理解正确吗?

可以序列化非静态内部类,前提是外部类是可序列化的。但是在 Java 和 Scala 中,您都需要明确地告诉编译器该类是可序列化的(通过扩展Serializable),并且 rx-javaNotification和 rx-scalaOnNext都不是可序列化的。

或者有什么方法可以解决这个问题,并序列化通知?

在 Akka 中,您可以为任何类编写自己的序列化程序:http: //doc.akka.io/docs/akka/snapshot/scala/serialization.html。Java 序列化仅在默认情况下使用。

于 2014-04-24T05:15:14.653 回答
1

Kontraktor-Reactive-Streams 提供基于反应流的出色远程处理性能。您不会沉迷于使用 Kontraktor,但可以将其用作提供快速远程处理的工具(基于快速序列化)。

public static void remotingRxToRx() {
    Observable<Integer> range = Observable.range(0, 50_000_000);
    Publisher<Integer> pub = RxReactiveStreams.toPublisher(range);

    KxReactiveStreams.get().asRxPublisher(pub)
        .serve(new TCPNIOPublisher().port(3456));

    RateMeasure rm = new RateMeasure("events");

    KxPublisher<Integer> remoteStream =
        KxReactiveStreams.get()
            .connect(Integer.class, new TCPConnectable().host("localhost").port(3456));

    RxReactiveStreams.toObservable(remoteStream)
        .forEach( i -> rm.count() );
}

当前链接: https ://github.com/RuedigerMoeller/kontraktor/tree/trunk/modules/reactive-streams/src/examples/src/rxstreamserver

很快将被转移到一个单独的项目中(只需检查我的项目)

于 2015-07-07T19:33:41.613 回答