0

我正在尝试通过两个不同的 API 将事件发布到 kafka 的示例。一个接受创建事件,另一个接受相同主题用户的更新事件。

这两个 API 都将消息发布到具有不同模式的同一个 kafka 主题。

基本上是 UserCreated、UserUpdated 模式发布到用户主题。

消费者只对 UserUpdated 事件感兴趣,因此想知道如何设置消费者以支持 TopicRecordNameStrategy。现在我有两个消费者在听用户主题,并且都在接收消息。

我编写了连接到confluent kafka的示例spring boot应用程序,其中生产者和消费者都在同一个服务中

我有一个休息 API,它支持两个端点来创建和更新。这个 REST API 向生产者发布消息

这是我的代码。

应用 yml

  name: users
  partitions-num: 3
  replication-factor: 3
server:
  port: 9080
spring:
  kafka:
    properties:
      bootstrap.servers: *****
      value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
      sasl:
        jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username='****' password='****';
        mechanism: PLAIN
      security.protocol: SASL_SSL

      # CCloud Schema Registry Connection parameter
      schema.registry.url: ******
      basic.auth.credentials.source: USER_INFO
      schema.registry.basic.auth.user.info: *****:*****
    consumer:
      group-id: local-test-user-consumergroup
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    template:
      default-topic:
logging:
  level:
    root: info

RestController.java

import com.aligntech.UserUpdated;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/user")
public class KafkaController {

    private final Producer producer;

    @Autowired
    KafkaController(Producer producer) {
        this.producer = producer;
    }

    @PostMapping(value = "/create")
    public void create(@RequestParam("name") String name, @RequestParam("age") Integer age) {
        UserCreated build = UserCreated.newBuilder().setAge(age).setName(name).build();
        this.producer.sendMessage(build);
    }


    @PutMapping(value = "/update")
    public void update(@RequestParam("name") String name, @RequestParam("age") Integer age) {
        UserUpdated build = UserUpdated.newBuilder().setAge(age).setName(name).setUpdated(true).build();
        this.producer.sendMessage(build);
    }
}

生产者.java

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class Producer {


    private final KafkaTemplate<String, User> kafkaTemplate;

    @Value("${topic.name}")
    private String topic;

    @Autowired
    public Producer(KafkaTemplate<String, User> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    void sendMessage(com.aligntech.UserCreated user) {
        ProducerRecord record = new ProducerRecord<>(topic, user);
        kafkaTemplate.send(record);
    }


    void sendMessage(com.aligntech.UserUpdated user) {
        ProducerRecord record = new ProducerRecord<>(topic, user);
        kafkaTemplate.send(record);
    }
}

消费者.java

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class Consumer {

    @KafkaListener(topics = "users", groupId = "user-updated-group")
    public void consumeUserUpdated(ConsumerRecord<String, com.aligntech.UserUpdated> record) {
        log.info("Read UserUpdated Record");
        log.info(record.topic() + ":" + record.key() + ":" + record.headers());
        log.info(String.format("Consumed message -> %s", record.value()));
    }


    @KafkaListener(topics = "users", groupId = "user-created-group")
    public void consumeUserCreated(ConsumerRecord<String, com.aligntech.UserCreated> record) {

        log.info("Read UserCreated Record");
        log.info(record.topic() + ":" + record.key() + ":" + record.headers());
        log.info(String.format("Consumed message -> %s", record.value()));
    }
}

输出

2021-05-24 14:51:54.220  INFO 3878 --- [ntainer#1-0-C-1] com.****.kafka.kafkatest.Consumer  : Read UserCreated Record
2021-05-24 14:51:54.223  INFO 3878 --- [ntainer#1-0-C-1] com.****.kafka.kafkatest.Consumer  : users:null:RecordHeaders(headers = [], isReadOnly = false)
2021-05-24 14:51:54.223  INFO 3878 --- [ntainer#1-0-C-1] com.****.kafka.kafkatest.Consumer  : Consumed message -> {"name": "UserUpdated", "age": 25, "updated": true}
2021-05-24 14:51:54.508  INFO 3878 --- [ntainer#0-0-C-1] com.****.kafka.kafkatest.Consumer  : Read UserUpdated Record
2021-05-24 14:51:54.508  INFO 3878 --- [ntainer#0-0-C-1] com.****.kafka.kafkatest.Consumer  : users:null:RecordHeaders(headers = [], isReadOnly = false)
2021-05-24 14:51:54.508  INFO 3878 --- [ntainer#0-0-C-1] com.****.kafka.kafkatest.Consumer  : Consumed message -> {"name": "UserUpdated", "age": 25, "updated": true}
4

1 回答 1

0

You would need to add a RecortFilterStrategy to the listener container factory.

Unfortunately, there is only one per factory, which applies to all listeners, so you would need a different factory (and filter) for each listener.

You use a custom task executor in the each container (via a container customizer) and use the thread name to decide which records to filter out.

I can put together an example if needed.

@SpringBootApplication
public class So67669747Application {

    public static void main(String[] args) {
        SpringApplication.run(So67669747Application.class, args);
    }

    @Bean
    NewTopic topic2() {
        return TopicBuilder.name("so67669747").partitions(1).replicas(1).build();
    }

    @Bean
    RecordFilterStrategy<String, String> f1() {
        return rec -> Thread.currentThread().getName().contains("exec1")
                ? rec.value().startsWith("A")
                : rec.value().startsWith("B");
    }

    @Bean
    AsyncListenableTaskExecutor exec1() {
        return new SimpleAsyncTaskExecutor("exec1-");

    }

    @Bean
    AsyncListenableTaskExecutor exec2() {
        return new SimpleAsyncTaskExecutor("exec2-");

    }

    @KafkaListener(id = "so67669747-1", topics = "so67669747")
    public void listen1(String in) {
        System.out.println("1:" + in);
    }

    @KafkaListener(id = "so67669747-2", topics = "so67669747")
    public void listen2(String in) {
        System.out.println("2:" + in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("so67669747", i % 2 == 0 ? "AAAAAAA" : "BBBBBBB"));
        };
    }

}

@Component
class FactoryCustomizer {

    FactoryCustomizer(ConcurrentKafkaListenerContainerFactory<String, String> factory,
            RecordFilterStrategy<String, String> f1,
            AsyncListenableTaskExecutor exec1, AsyncListenableTaskExecutor exec2) {

        factory.setRecordFilterStrategy(f1);
        factory.setContainerCustomizer(container -> {
            if (container.getGroupId().equals("so67669747-1")) {
                container.getContainerProperties().setConsumerTaskExecutor(exec1);
            }
            else if (container.getGroupId().equals("so67669747-2")) {
                container.getContainerProperties().setConsumerTaskExecutor(exec2);
            }
        });
    }

}
于 2021-05-24T15:56:56.993 回答