1

我一直在尝试设置 2 个 CQRS 微服务并让它们与 Kafka 一起通信。我启动了一个带有 kafka 和 zookeeper 的 Docker 容器,并连接了两者。据我在终端中看到的,连接也有效

我的问题是:我 在哪里以及如何将 Kafka 连接到两者,以便我可以将事件从一个微服务传递到另一个微服务

我尝试遵循 NestJS Kafka 文档并最终在 Producer-Microservice 的 CommandHandler 中执行此操作

//create-user.handler.ts in the Producer Microservice
import { CommandHandler, EventBus, ICommandHandler } from "@nestjs/cqrs";
import { CreateUserCommand } from "src/commands/create-user.command";
import { UserCreatedEvent } from "src/events/user-created.event";
import { Client, ClientKafka, Transport } from '@nestjs/microservices';


@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand>{
    constructor(
        private readonly eventBus: EventBus,
    ){}

    @Client({
        transport: Transport.KAFKA,
        options: {
        client: {
            clientId: 'producer',
            brokers: ['127.0.0.1:9092'],
        },
        consumer: {
            groupId: 'user-group'
        }
        }
    })
    client: ClientKafka;
  
    async onModuleInit(){
        this.client.subscribeToResponseOf('user-created-event');
        await this.client.connect();
    }

    async execute(command: CreateUserCommand){
        const { username, email, pw } = command;
        const userId = Math.random().toString();
        console.log(username, email, pw);

        const result = this.client.send('user-created-event', 'REEE PLS') //should send event to kafka with a payload of "REEE PLS"
        
        this.eventBus.publish(new UserCreatedEvent(userId))//just for the application itself
        return {
            result
        }
    }
}

我将消费者设置为 NestJS 微服务,如下所示:

//main.ts in the Consumer Microservice
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';

async function bootstrap() {
  
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: 'consumer',
        brokers: ['127.0.0.1:9092'],
      },
      consumer: {
        groupId: 'user-group'
      }
    }
  });

  app.listen(() => console.log('Microservice is listening'));
}
bootstrap();

现在在消费者微服务的控制器中,我尝试使用@MessagePattern()@EventPattern()装饰器,两者都没有工作。我没有得到控制台日志,也没有任何可用的回报。这就是我在消费者微服务中的控制器的样子:

//app.controller.ts of the consumer microservice
import { Controller, Get } from '@nestjs/common';
import { AppService } from './app.service';
import { Client, ClientKafka, MessagePattern, Payload, Transport } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @MessagePattern('user-created-event')
  userCreated(@Payload() data){
    console.log(data);
    return "REEE";
  }
}
4

0 回答 0