2

我在 Nest.js 项目中使用 Kafka.js。这就是我初始化 KafkaClient 的方式:


@Module({
...
providers: [{
      provide: 'KAFKA_CLIENT',
      useFactory: async (configService: KafkaClientConfigService) => {

        const kafkaOptions = configService.getKafkaOptions();
        return ClientProxyFactory.create(kafkaOptions);
      },
      inject: [KafkaClientConfigService],
    },
  ]
...
})

现在我将 KafkaClient 注入到我的控制器中,并且我希望使用间隔的消息。虽然有一种方法可以使用 Kafka.js 来做到这一点,但consumer.pause()我在 KafkaClient 中找不到对此类选项的任何引用。

有没有办法通过暂停或限制消费者来做到这一点?

4

1 回答 1

0

ClientKafka一个protected consumer属性。在消费者本身上,您可以调用pause()and resume()

要使用这些方法,我目前正在扩展ClientKafka自定义类并将其用作提供程序。

import { ClientKafka, KafkaOptions } from '@nestjs/microservices'

export class KafkaClient extends ClientKafka {
  pause() {
    this.consumer.pause()
    // ...
  }
}
于 2021-12-14T16:30:46.277 回答