0

问候 Stackoverflow。多年来我一直在使用 stackoverflow 来寻找答案,这是我第一次尝试自己提出问题。所以如果我做错了,请随时告诉我。

目前我正在开发一个基于微服务架构的数据分析系统。假设该系统将由十几个自给自足的微服务组成,这些微服务通过 RabbitMQ 相互通信。它们中的每一个都封装在一个 docker-container 中,整个系统在生产中由 docker-swarm 提供支持。

特别是每个微服务都是一个 node.js 应用程序和相关的数据库,与一些 ORM 接口连接。它的任务是以 CRUD 方式管理和服务数据,并根据包含的数据提供一些准备好的查询的结果。没什么特别的。

为了提供微服务-微服务通信,我假设使用amqplib。但是使用它的方式还不确定。

我目前的问题是如何以 OOP 的方式利用 amqplib 将微服务间通信网络与应用程序的对象相关功能链接起来?通过 OOP 方式,我的意思是从长远来看可以替换 amqplib(和 RabbitMQ 本身)而无需更改与数据相关的逻辑。

我真正寻找的是当前正在使用 AMQP 的微服务应用程序的示例。如果有人可以提供链接,我将非常感激。

我的问题的第二部分。基于事件驱动的主体构建微服务应用程序,并将消息从 RabbitMQ 传递到应用程序的主事件队列是否有意义?这样每个过程都将以相同的方式调用,尽管它是内部或外部事件。

至于单个微服务的抽象示例:假设我有一个事件服务和一个连接到该服务的侦听器:

class UserManager {
  constructor(eventService) {
    this.eventService = eventService;
    this.eventServce.on("users.user.create-request", (payload) => {
      User.create(payload); // User interface is omitted in this example 
    }
  }
}

const eventService = new EventEmmiter();
const userManager = new UserManager(eventService);

另一方面,我有 RabbitMQ 连接,正在等待消息:

const amqp = require('amqplib');

amqp.connect('amqp-service-in-docker').then(connection => {
  connection.createChannel().then(channel => {
    // Here we use topic type of exchange to be able to filter only related messages 
    channel.assertExchange('some-exchange', 'topic'); 

    channel.assertQueue('').then(queue => {
      // And here we are waiting only the related messages 
      channel.bind(queue.queue, 'some-exchange', 'users.*');

      channel.consume(queue.queue, message => {
        // And here is the crucial part
      }
    }
  }    
}

我目前的想法是仅解析此消息并将其转发给 eventService 并将其路由键用作事件的名称,如下所示:

channel.consume(query.query, message => {
  const eventName = message.fields.routingKey;
  const eventPayload = JSON.parse(message.content.toString());
  eventService.emit(eventName, eventPayload);
}

但是 RPC 呢?我是否应该用另一种方法为他们进行另一次交流甚至是一个渠道,例如:

// In RPC channel
channel.consume(query.query, message => {
  eventService.once('users.user.create-response', response => {
    const recipient = message.properites.replyTo;
    const correlationId = msg.properties.correlationId;

    // Send response to specified recipient
    channel.sendToQueue(
      recipient,
      Buffer.from(JSON.stringify(resonse)),
      { 
        correlationId: correlationId
      }
    );

    channel.ack(message);
  });

  // Same thing
  const eventName = message.fields.routingKey;
  const eventPayload = JSON.parse(message.content.toString());
  eventService.emit(eventName, eventPayload);
}

然后我的 User 类应该在每次创建新用户时触发 'users.user.create-response' 事件。这不是拐杖吗?

4

0 回答 0