我一直在尝试设置 2 个 CQRS 微服务并让它们与 Kafka 一起通信。我启动了一个带有 kafka 和 zookeeper 的 Docker 容器,并连接了两者。据我在终端中看到的,连接也有效
我的问题是:我 在哪里以及如何将 Kafka 连接到两者,以便我可以将事件从一个微服务传递到另一个微服务
我尝试遵循 NestJS Kafka 文档并最终在 Producer-Microservice 的 CommandHandler 中执行此操作
//create-user.handler.ts in the Producer Microservice
import { CommandHandler, EventBus, ICommandHandler } from "@nestjs/cqrs";
import { CreateUserCommand } from "src/commands/create-user.command";
import { UserCreatedEvent } from "src/events/user-created.event";
import { Client, ClientKafka, Transport } from '@nestjs/microservices';
@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand>{
constructor(
private readonly eventBus: EventBus,
){}
@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'producer',
brokers: ['127.0.0.1:9092'],
},
consumer: {
groupId: 'user-group'
}
}
})
client: ClientKafka;
async onModuleInit(){
this.client.subscribeToResponseOf('user-created-event');
await this.client.connect();
}
async execute(command: CreateUserCommand){
const { username, email, pw } = command;
const userId = Math.random().toString();
console.log(username, email, pw);
const result = this.client.send('user-created-event', 'REEE PLS') //should send event to kafka with a payload of "REEE PLS"
this.eventBus.publish(new UserCreatedEvent(userId))//just for the application itself
return {
result
}
}
}
我将消费者设置为 NestJS 微服务,如下所示:
//main.ts in the Consumer Microservice
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
clientId: 'consumer',
brokers: ['127.0.0.1:9092'],
},
consumer: {
groupId: 'user-group'
}
}
});
app.listen(() => console.log('Microservice is listening'));
}
bootstrap();
现在在消费者微服务的控制器中,我尝试使用@MessagePattern()
和@EventPattern()
装饰器,两者都没有工作。我没有得到控制台日志,也没有任何可用的回报。这就是我在消费者微服务中的控制器的样子:
//app.controller.ts of the consumer microservice
import { Controller, Get } from '@nestjs/common';
import { AppService } from './app.service';
import { Client, ClientKafka, MessagePattern, Payload, Transport } from '@nestjs/microservices';
@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
@MessagePattern('user-created-event')
userCreated(@Payload() data){
console.log(data);
return "REEE";
}
}