45

我们将在几周后开始一个新的 Spring 4 应用程序。我们想使用一些事件驱动的架构。今年我到处阅读有关“Reactor”的信息,在网上寻找它时,我偶然发现了“Akka”。

所以现在我们有3个选择:

我找不到真正的比较。


现在我们只需要类似的东西:

  • X注册监听Event E
  • Y注册监听Event E
  • Z发送一个Event E

然后XandY将接收并处理该事件。

我们很可能会以异步方式使用它,但肯定会有一些同步场景。我们很可能总是发送一个类作为事件。(Reactor 示例主要使用字符串和字符串模式,但它也支持对象)。


据我了解,ApplicationEvent默认情况下同步Reactor工作并以异步方式工作。并且Reactor还允许使用该await()方法使其有点同步。Akka提供或多或少的相同Reactor,但也支持Remoting。

关于 Reactor 的await()方法:它可以等待多个线程完成吗?或者甚至可能是这些线程的一部分?如果我们从上面举个例子:

  • X注册监听Event E
  • Y注册监听Event E
  • Z发送一个Event E

X 是否可以通过说:等待并 Y完成来使其同步。是否有可能让它只等待X,而不是等待Y


也许还有一些替代品?例如 JMS 呢?

很多问题,但希望你能提供一些答案!

谢谢!


编辑:示例用例

  1. 当特定事件被触发时,我想创建 10000 封电子邮件。每封电子邮件都必须使用用户特定的内容生成。所以我会创建很多线程(max = system cpu cores)来创建邮件并且不会阻塞调用者线程,因为这可能需要几分钟。

  2. 当特定事件被触发时,我想从未知数量的服务中收集信息。每次获取大约需要 100 毫秒。在这里我可以想象使用 Reactor 的await,因为我需要这些信息来继续我在主线程中的工作。

  3. 当特定事件被触发时,我想根据应用程序配置执行一些操作。因此应用程序必须能够动态(取消)注册消费者/事件处理程序。他们会用事件做他们自己的事情,我不在乎。所以我会为每个处理程序创建一个线程,然后继续在主线程中完成我的工作。

  4. 简单的解耦:我基本上知道所有的接收器,但我只是不想在我的代码中调用每个接收器。这应该主要是同步完成的。

听起来我需要一个 ThreadPool 或一个 RingBuffer。这些框架是否有动态的 RingBuffers,如果需要,它的大小会增加?

4

3 回答 3

32

我不确定我能否在这个狭小的空间内充分回答您的问题。但我会试一试!:)

就功能而言, Spring 的ApplicationEvent系统和 Reactor 确实非常不同。ApplicationEvent路由基于ApplicationListener. 任何比这更复杂的事情,您都必须自己实现逻辑(但这不一定是坏事)。然而,Reactor 提供了一个全面的路由层,它也非常轻量级且完全可扩展。两者在功能上的任何相似之处都在于它们订阅和发布事件的能力,这实际上是任何事件驱动系统的特性。另外不要忘记spring-messagingSpring 4 中的新模块。它是 Spring Integration 中可用工具的子集,还提供了围绕事件驱动架构构建的抽象。

Reactor 将帮助您解决一些您必须自己管理的关键问题:

选择器匹配:ReactorSelector进行匹配,它包含一系列匹配——从简单的.equals(Object other)调用到允许占位符提取的更复杂的 URI 模板匹配。您还可以使用自己的自定义逻辑扩展内置选择器,这样您就可以使用丰富的对象作为通知键(例如域对象)。

Stream 和 Promise API:您Promise已经在参考.await()方法时提到了 API,这实际上适用于期望阻塞行为的现有代码。在使用 Reactor 编写新代码时,使用组合和回调来通过不阻塞线程来有效利用系统资源的压力不会太大。在依赖少量线程来执行大量任务的架构中,阻塞调用者几乎从来都不是一个好主意。Futures 根本不是云可扩展的,这就是现代应用程序利用替代解决方案的原因。

您的应用程序可以使用 Streams 或 Promises 中的任何一种来构建,但老实说,我认为您会发现Stream更灵活。关键的好处是 API 的可组合性,它允许您在依赖链中将操作连接在一起而不会阻塞。作为基于您描述的电子邮件用例的完全即兴示例:

@Autowired
Environment env;
@Autowired
SmtpClient client;

// Using a ThreadPoolDispatcher
Deferred<DomainObject, Stream<DomainObject>> input = Streams.defer(env, THREAD_POOL);

input.compose()
  .map(new Function<DomainObject, EmailTemplate>() {
    public EmailTemplate apply(DomainObject in) {
      // generate the email
      return new EmailTemplate(in);
    }
  })
  .consume(new Consumer<EmailTemplate>() {
    public void accept(EmailTemplate email) {
      // send the email
      client.send(email);
    }
  });

// Publish input into Deferred
DomainObject obj = reader.readNext();
if(null != obj) {
  input.accept(obj);
}

Reactor 还提供了Boundary,它基本上是用于阻止任意消费者的(因此,如果您只想阻止完成,则不必CountDownLatch构造 a )。在这种情况下,您可以使用 raw并使用and方法来触发服务状态检查。PromiseConsumerReactoron()notify()

但是,对于某些事情,您想要的似乎是从 aFuture返回的ExecutorService,不是吗?为什么不把事情简单化?只有在吞吐量性能和开销效率很重要的情况下,Reactor 才会真正受益。如果你阻塞了调用线程,那么你很可能会抹去 Reactor 无论如何都会给你带来的效率提升,所以在这种情况下你可能会更好地使用更传统的工具集。

Reactor 开放性的好处是没有什么可以阻止两者的交互。Futures您可以在Consumers没有静电的情况下自由混合。在这种情况下,请记住,您只会与最慢的组件一样快。

于 2013-12-18T21:16:41.767 回答
9

让我们忽略 Spring ApplicationEvent,因为它确实不是为您的要求而设计的(更多关于 bean 生命周期管理)。

你需要弄清楚的是,如果你想这样做

  1. 面向对象的方式(即参与者、动态消费者、动态注册)
  2. 服务方式(静态消费者,启动时注册)。

使用您的示例,X它们Y是:

  1. 短暂的实例 (1) 或者它们是
  2. 长寿的单身人士/服务对象(2)?

如果您需要即时注册消费者,那么 Akka 是一个不错的选择(我不确定反应器,因为我从未使用过它)。如果您不想使用临时对象,则可以使用 JMS 或 AMQP。

您还需要了解这些库试图解决两个问题:

  1. 并发(即在同一台机器上并行做事)
  2. 分布(即在多台机器上并行做事)

Reactor 和 Akka 主要关注#1。Akka 最近刚刚添加了集群支持,并且 actor 抽象使得 #2 更容易实现。消息队列(JMS、AMQP)专注于#2。

对于我自己的工作,我做服务路线并使用经过大量修改的 Guava EventBus 和 RabbitMQ。我使用类似于Guava Eventbus的注释,但也为总线上发送的对象提供注释,但是您可以在异步模式下使用 Guava 的 EventBus 作为 POC,然后像我一样制作自己的。

您可能认为您需要拥有动态消费者 (1),但大多数问题都可以通过简单的 pub/sub 解决。管理动态消费者也可能很棘手(因此 Akka 是一个不错的选择,因为 Actor 模型对此进行了各种管理)

于 2013-12-19T02:57:18.733 回答
3

仔细定义你想要从框架中得到的东西。如果一个框架有比你需要的更多的特性,它并不总是好的。更多的功能意味着更多的错误、更多的代码需要学习,以及更少的性能。

一些需要关注的功能是:

  • 演员的本质(线程或轻量级对象)
  • 在机器集群上工作的能力(Akka)
  • 持久消息队列 (JMS)
  • 特定功能,如信号(没有信息的事件)、转换(将来自不同端口的消息组合成复杂事件的对象,请参阅 Petri 网)等。

小心诸如等待之类的同步特性——它阻塞了整个线程,并且当actor在线程池上执行时是危险的(线程饥饿)。

更多框架可供查看:

Fork-Join Pool - 在某些情况下,允许await没有线程饥饿

科学的工作流程系统

Java 的数据流框架- 信号、转换

附加组件:两种演员。

通常,并行工作系统可以表示为一个图,其中活动节点相互发送消息。在 Java 中,与大多数其他主流语言一样,活动节点(参与者)可以实现为线程或由线程池执行的任务(可运行或可调用)。通常,参与者的一部分是线程,一部分是任务。两种方法都有其优点和缺点,因此为系统中的每个参与者选择最合适的实现至关重要。简而言之,线程可以阻塞(并等待事件),但会为它们的堆栈消耗大量内存。任务可能不会阻塞,而是使用共享堆栈(池中的线程)。

如果一个任务调用一个阻塞操作,它会从服务中排除一个池线程。如果许多任务阻塞,它们可以排除所有线程,从而导致死锁——那些可以解除阻塞的任务无法运行。这种死锁称为线程饥饿。如果为了防止线程饥饿,将线程池配置为无限制,我们只是将任务转换为线程,失去了任务的优势。

为了消除任务中对阻塞操作的调用,应该将任务分成两个(或更多)——第一个任务调用阻塞操作并退出,其余的被格式化为阻塞操作完成时启动的异步任务。当然,阻塞操作必须有一个替代的异步接口。因此,例如,不应使用同步读取套接字,而应使用 NIO 或 NIO2 库。

不幸的是,标准 Java 库缺少用于流行同步工具(如队列和信号量)的异步对应物。幸运的是,它们很容易从头开始实现(有关示例,请参见Java 的 Dataflow 框架)。

因此,纯粹使用非阻塞任务进行计算是可能的,但会增加代码的大小。明显的建议是尽可能使用线程,并且仅将任务用于简单的大规模计算。

于 2013-12-18T18:18:53.970 回答