0

我正在尝试使用 AkkaConsistentHashingRoutingLogic来保证具有相同密钥的消息被路由到同一个 Actor。重要的是,具有相同键的消息按 FIFO 顺序处理。不同key的消息可以路由到不同的Actor,自由并行处理。我没有在分布式模式下使用 Akka。

这些消息实际上是从 RabbitMQ 代理读取的 JSON 消息,因此我的 Master Actor 接收到 AMQP 消息并使用路由键作为消息键。相同的密钥也在消息本身中。Actor 是 Spring 应用程序的一部分。

我的主演员看起来像这样:

@Named("MessageHandlerMaster")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MessageHandlerMaster extends UntypedActor {

  private static final Logger log = LoggerFactory.getLogger(MessageHandlerMaster.class);

  private Router router;

  @Autowired
  public MessageHandlerMaster(final SpringProps springProps) {

  List<Routee> routees = Stream.generate(() -> {
      ActorRef worker = getContext().actorOf(springProps.create(MessageHandlerWorker.class));
      getContext().watch(worker);
      return new ActorRefRoutee(worker);
    }).limit(5) //todo: configurable number of workers
      .collect(Collectors.toList());

    router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees);
  }

  public void onReceive(Object message) {
    if (message instanceof Message) {
      Message amqpMessage = (Message) message;
      String encoding = getMessageEncoding(amqpMessage);
      try {
        String json = new String(amqpMessage.getBody(), encoding);
        String routingKey = amqpMessage.getMessageProperties().getReceivedRoutingKey();
        log.debug("Routing message based on routing key " + routingKey);
        router.route(new ConsistentHashingRouter.ConsistentHashableEnvelope(json, routingKey), getSender());
      } catch (UnsupportedEncodingException e) {
        log.warn("Unknown content encoding sent in message! {}", encoding);
      }
    } else if (message instanceof Terminated) {
      //if one of the routee's died, remove it and replace it
      log.debug("Actor routee terminated!");
      router.removeRoutee(((Terminated) message).actor());
      ActorRef r = getContext().actorOf(Props.create(MessageHandlerWorker.class));
      getContext().watch(r);
      router = router.addRoutee(new ActorRefRoutee(r));
    }
  }

  private static String getMessageEncoding(Message message) {
    String encoding = message.getMessageProperties().getContentEncoding();
    if ((encoding == null) || (encoding.equals(""))) {
      encoding = "UTF-8";
    }
    return encoding;
  }
}

我最初通过以下方式获得主人一次:

this.master = actorSystem.actorOf(springProps.create(MessageHandlerMaster.class), "master");

然后只需通过以下方式将消息提交给它:

master.tell(message, ActorRef.noSender());

但是,当我从我的工作人员打印日志时,onReceive()我看到不同的调度程序线程有时被用于同一个键。

同样不清楚为什么有时相同的调度程序线程被用于 Master Actor 和 Worker Actor。这不应该是线程之间的异步消息传递吗?

16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.360 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186

正如您在此处看到的,带有键 10420186 的 Worker 处理消息的调度程序线程有时是 9,有时是 10。Master actor 有时也使用这两个线程。

我如何确定它ConsistentHashingRoutingLogic确实在工作并且同一个线程使用相同的密钥处理消息?我在路由器初始化中做错了什么吗?

4

1 回答 1

0

所以@vrudkovsk 对他的评论是正确的。我认为您在线程和演员之间感到困惑。Actor 只是内存中具有地址和邮箱的对象。调度程序本质上是与参与者一起执行操作的线程池。示例操作是:

  • 从邮箱中取出消息以在actor中处理它
  • 将消息排入邮箱。

不同的线程可以为同一个actor执行动作。这是由调度员决定的。Akka 确保一次只有一个线程将处理参与者中的消息。这并不意味着它将始终是同一个线程。

如果您想确保他们来到同一个演员,我建议使用context.self.pathor 记录演员路径或地址,context.self.path.address因为它们是相同的唯一标识符ActorSystem

于 2016-10-20T16:46:50.493 回答