39

目前,我正在使用带有 Scala(和 JavaFX)的 EventBus/ PubSub架构/模式来实现一个简单的笔记组织应用程序(有点像带有一些附加思维导图功能的 Evernote 客户端),我不得不说我真的很喜欢 EventBus观察者模式。

以下是一些 EventBus 库:

https://code.google.com/p/guava-libraries/wiki/EventBusExplained

http://eventbus.org(目前似乎已关闭)这是我在实施中使用的。

http://greenrobot.github.io/EventBus/

这是 EventBus 库的比较:http ://codeblock.engio.net/37/

EventBus 与发布-订阅模式有关。

然而 !

最近,我参加了Coursera 的 Reactive 课程,并开始怀疑使用RXJava代替 EventBus 是否会在单线程应用程序中进一步简化事件处理代码?

我想问一下使用这两种技术(某种事件总线库某种​​形式的响应式扩展(RX))编程的人的经验:使用 RX 解决事件处理复杂性是否比使用给定的事件总线架构更容易没有必要使用多个线程

我之所以这么问是因为我在 Coursera 上的反应式讲座中听说RX导致的代码比使用观察者模式更简洁(即没有“回调地狱”),但是我没有发现 EventBus 架构与RXJava之间的任何比较。所以很明显,EventBus 和 RXJava 都比观察者模式更好,但 在代码清晰度和可维护性方面哪个在单线程应用程序中更好?

如果我理解正确的话, RXJava的主要卖点是如果存在阻塞操作(例如等待服务器响应),它可以用来生成响应式应用程序。

但我根本不关心异步性,我关心的只是在单线程应用程序中保持代码干净、清晰且易于推理。

在这种情况下,使用 RXJava 是否比使用 EventBus 更好?

我认为 EventBus 将是一个更简单和更清洁的解决方案,我看不出有任何理由为什么我应该将 RXJava 用于单线程应用程序以支持简单的 EventBus 架构。

但我可能错了!

如果我错了,请纠正我,并解释为什么在没有执行阻塞操作的单线程应用程序的情况下,RXJava 会比简单的 EventBus 更好。

4

4 回答 4

30

以下是我认为在单线程同步应用程序中使用反应式事件流的好处。

1. 更多的声明性,更少的副作用和更少的可变状态。

事件流能够封装逻辑和状态,可能使您的代码没有副作用和可变变量。

考虑一个计算按钮点击次数并将点击次数显示为标签的应用程序。

纯Java解决方案:

private int counter = 0; // mutable field!!!

Button incBtn = new Button("Increment");
Label label = new Label("0");

incBtn.addEventHandler(ACTION, a -> {
    label.setText(Integer.toString(++counter)); // side-effect!!!
});

ReactFX 解决方案:

Button incBtn = new Button("Increment");
Label label = new Label("0");

EventStreams.eventsOf(incBtn, ACTION)
        .accumulate(0, (n, a) -> n + 1)
        .map(Object::toString)
        .feedTo(label.textProperty());

没有使用可变变量,并且副作用的赋值label.textProperty()隐藏在抽象后面。

Eugen Kiss在他的硕士论文中提出了将 ReactFX 与 Scala 集成。使用他的集成,解决方案可能如下所示:

val incBtn = new Button("Increment")
val label = new Label("0")

label.text |= EventStreams.eventsOf(incBtn, ACTION)
    .accumulate(0, (n, a) => n + 1)
    .map(n => n.toString)

它与前面的相同,具有消除控制反转的额外好处。

2. 消除故障和冗余计算的方法。(仅限 ReactFX)

毛刺是可观察状态下的暂时不一致。ReactFX 具有暂停事件传播的方法,直到对对象的所有更新都已处理完毕,从而避免了故障和冗余更新。特别是,请查看可暂停事件流IndicatorInhiBeans我关于 InhiBeans 的博客文章。这些技术依赖于事件传播是同步的这一事实,因此不会转换为 rxJava。

3.清除事件生产者和事件消费者之间的联系。

事件总线是任何人都可以发布和订阅的全局对象。事件生产者和事件消费者之间的耦合是间接的,因此不太清楚。使用响应式事件流,生产者和消费者之间的耦合更加明确。相比:

活动巴士:

class A {
    public void f() {
        eventBus.post(evt);
    }
}

// during initialization
eventBus.register(consumer);
A a = new A();

只看初始化代码并不清楚a和之间的关系。consumer

事件流:

class A {
    public EventStream<MyEvent> events() { /* ... */ }
}

// during initialization
A a = new A();
a.events().subscribe(consumer);

a和之间的关系consumer非常明确。

4. 对象发布的事件在其 API 中体现。

使用上一节中的示例,在事件总线示例中,A的 API 不会告诉您 的实例发布了哪些事件A。另一方面,在事件流示例中,A的 API 声明A发布事件类型为 的实例MyEvent

于 2014-07-29T20:59:30.293 回答
5

我认为您必须使用 rxjava,因为它提供了更多的灵活性。如果你需要总线,你可以使用这样的枚举:

public enum Events {

  public static PublishSubject <Object> myEvent = PublishSubject.create ();
}

//where you want to publish something
Events.myEvent.onNext(myObject);

//where you want to receive an event
Events.myEvent.subscribe (...);

.

于 2014-08-15T15:22:29.817 回答
1

根据我上面的评论,JavaFx 有一个ObservableValue类,它对应于 RX Observable(可能ConnectableObservable更精确,因为它允许多个订阅)。我使用以下隐式类从 RX 转换为 JFX,如下所示:

import scala.collection.mutable.Map
import javafx.beans.InvalidationListener
import javafx.beans.value.ChangeListener
import javafx.beans.value.ObservableValue
import rx.lang.scala.Observable
import rx.lang.scala.Subscription

/**
 * Wrapper to allow interoperability bewteen RX observables and JavaFX
 * observables. 
 */
object JfxRxImplicitConversion {
  implicit class JfxRxObservable[T](theObs : Observable[T]) extends ObservableValue[T] { jfxRxObs =>
    val invalListeners : Map[InvalidationListener,Subscription] = Map.empty
    val changeListeners : Map[ChangeListener[_ >: T],Subscription] = Map.empty
    var last : T = _
    theObs.subscribe{last = _}

    override def getValue() : T = last 

    override def addListener(arg0 : InvalidationListener) : Unit = {
      invalListeners += arg0 -> theObs.subscribe { next : T => arg0.invalidated(jfxRxObs) }
    }

    override def removeListener(arg0 : InvalidationListener) : Unit = {
      invalListeners(arg0).unsubscribe
      invalListeners - arg0
    }

    override def addListener(arg0 : ChangeListener[_ >: T]) : Unit = {
      changeListeners += arg0 -> theObs.subscribe { next : T => arg0.changed(jfxRxObs,last,next) }
    }

    override def removeListener(arg0 : ChangeListener[_ >: T]) : Unit = {
      changeListeners(arg0).unsubscribe
      changeListeners - arg0
    }
  }
}

然后允许您像这样使用属性绑定(这是 ScalaFX,但对应Property.bind于 JavaFX):

new Label {
    text <== rxObs
}

rxObs例如,可能在哪里:

val rxObs : rx.Observable[String] = Observable.
  interval(1 second).
  map{_.toString}.
  observeOn{rx.lang.scala.schedulers.ExecutorScheduler(JavaFXExecutorService)} 

这只是一个每秒递增的计数器。只记得导入隐式类。我无法想象它会变得比这更干净!

由于需要使用与 JavaFx 配合得很好的调度程序,上述内容有点复杂。有关如何实施的要点的链接,请参阅此问题。JavaFXExecutorService有一个对 scala RX 的增强请求,以使其成为隐式参数,因此将来您可能不需要该.observeOn调用。

于 2014-05-22T10:18:24.327 回答
1

自从我 2 年前提出这个问题以来,我学到了一两件事,这是我目前的理解(如斯蒂芬的 FRP书中所述):

两者都试图帮助描述状态机,即描述程序的状态如何响应事件而变化。

EventBus 和 FRP 之间的主要区别在于组合性:

  • 什么是成分?

  • FRP 是描述状态机的组合方式,而事件总线不是。为什么 ?

    • 它描述了一个状态机。
    • 它是组合的,因为描述是通过使用纯函数和不可变值来完成的。
  • EventBus 不是描述状态机的组合方式。为什么不 ?

    • 您不能采用任何两个事件总线并以提供描述组合状态机的新组合事件总线的方式组合它们。为什么不 ?
      • 事件总线不是一等公民(与 FRP 的事件/流相反)。
        • 如果你试图让活动巴士成为一等公民,会发生什么?
          • 然后你会得到类似 FRP/RX 的东西。
      • 受事件总线影响的状态不受
        • 一等公民(即与 FRP 的行为/单元格相反的引用透明、纯值)
        • 以声明性/功能性方式与事件总线相关联,而是由事件处理强制触发状态修改

总之,EventBus 不是组合的,因为组合的 EventBus 的含义和行为(即受所述组合的 EventBus 影响的状态的时间演变)取决于时间(即未包含的软件部分的状态) 明确地在组合的 EventBus 的声明中)。换句话说,如果我尝试声明一个组合 EventBus,那么将无法确定(仅通过查看组合 EventBus 的声明)哪些规则管理受组合 EventBus 影响的那些状态的状态演变,这与可以做到这一点的 FRP 形成对比。)

于 2017-03-15T20:23:00.400 回答