113

我已经阅读了哪些设计决策有利于 Scala 的 Actors 而不是 JMS?.

通常,我们使用已经存在多年的消息传递解决方案:或者使用 JMS 实现(例如 WebSphere MQ 或 Apache ActiveMQ)进行点对点通信,或者使用 Tibco Rendevous 进行多播消息传递。

它们非常稳定,经过验证并提供高可用性和性能。然而,配置和设置似乎比在 Akka 中复杂得多。

我何时以及为什么应该将 Akka 用于上述产品(WebSphere MQ 或 ActiveMQ)迄今为止已成功使用的一些用例?为什么我应该考虑在我未来的项目中使用 Akka 而不是 WebSphere MQ 或 Tibco RV?

我什么时候应该避免使用 Akka?它是否提供与其他解决方案相同的高可用性和性能?或者将 Akka 与其他消息中间件进行比较是不是一个坏主意?

除了 JMS(点对点)、TibcoRV(多播)和 Akka 之外,我还应该考虑 JVM 环境中的另一种消息传递解决方案?

4

3 回答 3

96

首先,“较旧”的消息系统 (MQ) 在实现上较旧,但它们在工程理念上是较新的:事务性持久队列。Scala Actors 和 Akka 可能是较新的实现,但建立在较旧的 Actors 并发模型之上。

然而,这两个模型在实践中最终非常相似,因为它们都是基于事件消息的:参见我对RabbitMQ vs Akka的回答。

如果您打算只为 JVM 编写代码,那么 Akka 可能是一个不错的选择。否则我会使用 RabbitMQ。

此外,如果您是 Scala 开发人员,那么 Akka 应该是不费吹灰之力的。然而,Akka 的 Java 绑定不是很 Java-ish,并且由于 Scala 的类型系统需要强制转换。

同样在 Java 中,人们通常不会制作不可变对象,我建议您为消息传递这样做。因此,在 Java 中很容易意外地使用 Akka 做一些无法扩展的事情(使用可变对象作为消息,依赖于奇怪的闭包回调状态)。对于 MQ,这不是问题,因为消息总是以速度为代价进行序列化。对于 Akka,它们通常不是。

与大多数 MQ 相比,Akka 对大量消费者的扩展性也更好。这是因为对于大多数 MQ(JMS、AMQP)客户端来说,每个队列连接都需要一个线程......因此很多队列 == 很多永久运行的线程。不过,这主要是客户问题。我认为 ActiveMQ Apollo 有一个非阻塞调度程序,据称可以为 AMQP 解决该问题。RabbitMQ 客户端具有允许您组合多个使用者的通道,但是仍然存在大量使用者可能导致死锁或连接死亡的问题,因此通常会添加更多线程来避免此问题。

话虽如此, Akka 的远程处理还是相当新的,可能仍然无法提供传统消息队列提供的所有可靠消息保证和 QoS(但这每天都在变化)。它通常也是点对点的,但我认为支持服务器对点,这通常是大多数 MQ 系统所做的(即单点故障),但也有点对点的 MQ 系统(RabbitMQ 是服务器-对等)。

最后,RabbitMQ 和 Akka 实际上是一对好搭档。您可以使用 Akka 作为 RabbitMQ 的包装器,特别是因为 RabbitMQ 不能帮助您处理消息的消耗和本地路由消息(在单个 JVM 中)。

何时选择阿卡

  • 有很多消费者(想想数百万)。
  • 需要低延迟
  • 对 Actor 并发模型开放

示例系统:交互式实时聊天系统

何时选择 MQ

  • 需要与许多不同的系统(即非 JVM)集成
  • 消息可靠性比延迟更重要
  • 想要更多工具和管理 UI
  • 因为之前的几点更适合长时间运行的任务
  • 想要使用与 Actors 不同的并发模型

示例系统:调度事务批处理系统

根据有关评论进行编辑

我假设 OP 关注的是Akka和消息队列都可以处理的分布式处理。那就是我假设他在谈论分布式 Akka与大多数消息队列相比,使用 Akka 进行本地并发是一个苹果对橙色的比较。我说的最多是因为你可以在本地应用消息队列模型作为并发模型(即主题、队列、交换),Reactor库和simple-react都可以这样做。

选择正确的并发模型/库对于低延迟应用程序非常重要。诸如消息队列之类的分布式处理解决方案通常并不理想,因为路由几乎总是通过线路完成,这显然比在应用程序内部要慢,因此 Akka 将是一个更好的选择。但是我相信一些专有的 MQ 技术允许本地路由。此外,正如我之前提到的,大多数 MQ 客户端在线程方面非常愚蠢,并且不依赖非阻塞 IO,并且每个连接/队列/通道都有一个线程......具有讽刺意味的是,非阻塞 io 并不总是低延迟,但通常是更多资源高效的。

正如您所看到的,分布式编程和并发编程的主题相当大并且每天都在变化,所以我的初衷不是混淆,而是专注于分布式消息处理的一个特定领域,这是我认为 OP 所关心的。在并发方面,人们可能希望将搜索重点放在“反应式”编程(RFP / 流)上,这是一种“较新”但与参与者模型和消息队列模型相似的模型,所有这些模型通常都可以组合起来,因为它们是基于事件的。

于 2012-10-22T19:15:42.280 回答
4

我不是消息系统方面的专家,但您可以在您的应用程序中将它们与 Akka 结合使用,从而获得两全其美的效果。下面是一个示例,您可能会发现它对试验 Akka 和消息传递系统很有用,在本例中是 ZeroMQ:

https://github.com/zcox/akka-zeromq-java

于 2011-04-17T13:03:45.013 回答
1

Akka-Camel 将是比 ZeroMQ 更好的示例 - ZeroMQ 是直接的 tcp 到 tcp 通信(因此为零 - 没有消息队列)。

使用 AkkaCamel,您可以抽象出队列并直接从参与者生成/使用消息,而无需任何代码来处理消息队列消息推送/拉取。

您可以放弃 akka-zeromq 并直接使用 Akka 进行远程处理。我认为 akka-zeromq 正在从核心库中删除,但我们为 akka 构建了一个很好的 zeromq 库,称为 scala-zeromq ( https://github.com/mDialog/scala-zeromq )

Akka 有几个关键的核心用例:

1) 可变状态

通过将共享状态隐藏在一个actor中来更容易处理共享状态。由于 Actor 同步处理消息,您可以在 Actor 中保持状态并通过 Actor API 以高一致性公开该字段

2) 分布

并发在 akka 中是免费的,所以你说它实际上是关于解决分布问题。跨机器和内核分布。Akka 内置了“位置透明性”,用于通过网络发送消息。它还具有与扩展单个服务相关的集群和模式。这使它成为一个非常好的分发解决方案(例如微服务架构)

这是一个将 Akka 与 ActiveMQ 与 Akka-Camel 一起使用的示例(使用 Java8)

import akka.actor.Props;
import akka.camel.Camel;
import akka.camel.CamelExtension;
import akka.testkit.TestActorRef;
import akka.testkit.TestProbe;
import org.junit.Ignore;
import org.junit.Test;
import akka.camel.javaapi.UntypedProducerActor;
import akka.camel.javaapi.UntypedConsumerActor;
import static com.rogers.totes.TotesTestFixtures.*;
import org.apache.activemq.camel.component.*;

public class MessagingTest {
    @Test @Ignore
    public void itShouldStoreAMessage() throws Exception{
        String amqUrl = "nio://localhost:61616";
        Camel camel = (Camel) CamelExtension.apply(system);
        camel.context().addComponent("activemq", ActiveMQComponent.activeMQComponent(amqUrl));

        TestProbe probe = TestProbe.apply(system);
        TestActorRef producer = TestActorRef.create(system, Props.create((Producer.class)));
        TestActorRef consumer = TestActorRef.create(system, Props.create((Consumer.class)));
        producer.tell("Produce", probe.ref());

        Thread.sleep(1000);
    }
}

class Producer extends UntypedProducerActor{

    @Override
    public String getEndpointUri() {
        return "activemq:foo.bar";
    }
}

class Consumer extends UntypedConsumerActor{

    @Override
    public String getEndpointUri() {
        return "activemq:foo.bar";
    }

    @Override
    public void onReceive(Object message) throws Exception {
        System.out.println("GOT A MESSAGE!" + message);

    }
}
于 2014-12-13T20:52:45.177 回答