3

我已经阅读了 nestjs 微服务和 kafka 文档,但我无法弄清楚其中的一些。如果你能帮助我,我将非常感激。因此,正如文档所说,我必须在 main.ts 文件中创建一个微服务,如下所示:

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    }
  }
});

await app.listen(() => console.log('app started'));

然后有一个像这样的kafkaModule文件:

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'HERO_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'hero',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'hero-consumer'
          }
        }
      },
    ]),
  ]
})
export class KafkaModule implements OnModuleInit {
  constructor(@Inject('HERO_SERVICE') private readonly clientService: KafkaClient)
  async onModuleInit() {
    await this.clientService.connect();
  }
}

首先我想不通的是 createMicroservice 的第一个参数有什么用?(我通过了 AppModule 和 KafkaModule 并且都正常工作。知道 kafkaModule 是在 appModule 中导入的)

另一件事是,据我了解,main.ts文件中的微服务部分和配置用于订阅MessagePattern或EventPattern装饰器中使用的主题,而kafkaModule中描述的kafkaClient用于发送消息到不同的话题。

这里的问题是,如果我之前说的是真的,那么为什么 clientModule 使用默认的 groupId 如果没有指定作为消费者工作。奇怪的是我找不到使用 clientModule 从任何主题获取任何消息的解决方案。我现在正在做的是在每个文件中使用不同的组 ID,这样它们就不会发生任何冲突。

4

1 回答 1

0

createMicroservice 的第一个参数,当你想消费来自特定主题的消息时,它将帮助指导消费者如何连接到 Kafka。

示例:我们想从主题获取消息:test01

我们如何声明?

import {Controller} from '@nestjs/common'
import {MessagePattern, Payload} from '@nestjs/microservices'

@Controller('sync')
export class SyncController {
  @MessagePattern('test01')
  handleTopicTest01(@Payload() message: Sync): any {
  // Handle your message here
  }
}

第二个块用作非消费者的生产者。当应用程序想要向特定主题发送消息时,clientModel 将支持这一点。

@Get()
sayHello() {
    return this.clientModule.send('say.hello', 'hello world')
}
于 2021-09-03T12:20:16.850 回答