我有以下课程。我已经在控制台中验证了,在 Kafka 侦听器中解析主题占位符值之前(在创建 bean 期间)调用了此类的构造函数:
public class MsgReceiver<MSG> extends AbstractMsgReceiver<MSG> implements
MessageReceiver<MSG> {
@SuppressWarnings("unused")
private String topic;
public MsgReceiver(String topic, MessageHandler<MSG> handler) {
super(handler);
this.topic = topic;
}
@KafkaListener(topics = "${my.messenger.kafka.topics.#{${topic}}.value}", groupId = "${my.messenger.kafka.topics.#{${topic}}.groupId}")
public void receiveMessage(@Headers Map<String, Object> headers, @Payload MSG payload) {
System.out.println("Received "+payload);
super.receiveMessage(headers, payload);
}
}
我的 application.yml 如下:
my:
messenger:
kafka:
address: localhost:9092
topics:
topic_1:
value: my_topic
groupId: 1
在创建 bean 期间,我传递了我希望在 Kafka 侦听器主题占位符中动态使用的“topic_1”。我尝试了如代码本身所示,但它不起作用。请建议如何做到这一点。