问候 Stackoverflow。多年来我一直在使用 stackoverflow 来寻找答案,这是我第一次尝试自己提出问题。所以如果我做错了,请随时告诉我。
目前我正在开发一个基于微服务架构的数据分析系统。假设该系统将由十几个自给自足的微服务组成,这些微服务通过 RabbitMQ 相互通信。它们中的每一个都封装在一个 docker-container 中,整个系统在生产中由 docker-swarm 提供支持。
特别是每个微服务都是一个 node.js 应用程序和相关的数据库,与一些 ORM 接口连接。它的任务是以 CRUD 方式管理和服务数据,并根据包含的数据提供一些准备好的查询的结果。没什么特别的。
为了提供微服务-微服务通信,我假设使用amqplib。但是使用它的方式还不确定。
我目前的问题是如何以 OOP 的方式利用 amqplib 将微服务间通信网络与应用程序的对象相关功能链接起来?通过 OOP 方式,我的意思是从长远来看可以替换 amqplib(和 RabbitMQ 本身)而无需更改与数据相关的逻辑。
我真正寻找的是当前正在使用 AMQP 的微服务应用程序的示例。如果有人可以提供链接,我将非常感激。
我的问题的第二部分。基于事件驱动的主体构建微服务应用程序,并将消息从 RabbitMQ 传递到应用程序的主事件队列是否有意义?这样每个过程都将以相同的方式调用,尽管它是内部或外部事件。
至于单个微服务的抽象示例:假设我有一个事件服务和一个连接到该服务的侦听器:
class UserManager {
constructor(eventService) {
this.eventService = eventService;
this.eventServce.on("users.user.create-request", (payload) => {
User.create(payload); // User interface is omitted in this example
}
}
}
const eventService = new EventEmmiter();
const userManager = new UserManager(eventService);
另一方面,我有 RabbitMQ 连接,正在等待消息:
const amqp = require('amqplib');
amqp.connect('amqp-service-in-docker').then(connection => {
connection.createChannel().then(channel => {
// Here we use topic type of exchange to be able to filter only related messages
channel.assertExchange('some-exchange', 'topic');
channel.assertQueue('').then(queue => {
// And here we are waiting only the related messages
channel.bind(queue.queue, 'some-exchange', 'users.*');
channel.consume(queue.queue, message => {
// And here is the crucial part
}
}
}
}
我目前的想法是仅解析此消息并将其转发给 eventService 并将其路由键用作事件的名称,如下所示:
channel.consume(query.query, message => {
const eventName = message.fields.routingKey;
const eventPayload = JSON.parse(message.content.toString());
eventService.emit(eventName, eventPayload);
}
但是 RPC 呢?我是否应该用另一种方法为他们进行另一次交流甚至是一个渠道,例如:
// In RPC channel
channel.consume(query.query, message => {
eventService.once('users.user.create-response', response => {
const recipient = message.properites.replyTo;
const correlationId = msg.properties.correlationId;
// Send response to specified recipient
channel.sendToQueue(
recipient,
Buffer.from(JSON.stringify(resonse)),
{
correlationId: correlationId
}
);
channel.ack(message);
});
// Same thing
const eventName = message.fields.routingKey;
const eventPayload = JSON.parse(message.content.toString());
eventService.emit(eventName, eventPayload);
}
然后我的 User 类应该在每次创建新用户时触发 'users.user.create-response' 事件。这不是拐杖吗?