0

我有以下课程。我已经在控制台中验证了,在 Kafka 侦听器中解析主题占位符值之前(在创建 bean 期间)调用了此类的构造函数:

public class MsgReceiver<MSG> extends AbstractMsgReceiver<MSG> implements 
MessageReceiver<MSG> {

@SuppressWarnings("unused")
private String topic;

public MsgReceiver(String topic, MessageHandler<MSG> handler) {
    super(handler);
    this.topic = topic;
}

@KafkaListener(topics = "${my.messenger.kafka.topics.#{${topic}}.value}", groupId = "${my.messenger.kafka.topics.#{${topic}}.groupId}")
public void receiveMessage(@Headers Map<String, Object> headers, @Payload MSG payload) {
    System.out.println("Received "+payload);
    super.receiveMessage(headers, payload);
}

}

我的 application.yml 如下:

my:
  messenger:
    kafka:
      address: localhost:9092
      topics:
        topic_1:
          value: my_topic
          groupId: 1

在创建 bean 期间,我传递了我希望在 Kafka 侦听器主题占位符中动态使用的“topic_1”。我尝试了如代码本身所示,但它不起作用。请建议如何做到这一点。

4

1 回答 1

1

在评估 SpEL 之前解析占位符;您不能使用 SpEL 动态构建占位符名称。此外,您不能引用这样的字段;您必须通过 bean 名称(和公共 getter)间接地做到这一点。

因此,要执行您想要的操作,您必须在使用 SpEL 构建属性名称后添加一个 getter 并从环境中动态获取属性。

有一个特殊的标记__listener允许您引用当前的 bean。

把它们放在一起...

@SpringBootApplication
public class So63056065Application {

    public static void main(String[] args) {
        SpringApplication.run(So63056065Application.class, args);
    }

    @Bean
    public MyReceiver receiver() {
        return new MyReceiver("topic_1");
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("my_topic").partitions(1).replicas(1).build();
    }
}

class MyReceiver {

    private final String topic;

    public MyReceiver(String topic) {
        this.topic = topic;
    }

    public String getTopic() {
        return this.topic;
    }

    @KafkaListener(topics = "#{environment.getProperty('my.messenger.kafka.topics.' + __listener.topic + '.value')}",
            groupId = "#{environment.getProperty('my.messenger.kafka.topics.' + __listener.topic + '.groupId')}")
    public void listen(String in) {
        System.out.println(in);
    }

}

结果...

2020-07-23 12:13:44.932  INFO 39561 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 1
    group.instance.id = null
...

1: partitions assigned: [my_topic-0]
于 2020-07-23T14:56:10.207 回答