我正在开发一个有 2 项服务的项目:读取、转换消息,然后写入另一个 Kafka。这两种服务的 Kafka 配置是不同的。这是我的 application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
sourcetopic1: topic1
destinationtopic1 : topic2
sourcetopic2: topic3
destinationtopic2 : topic4
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: TestCollector
client-id:TestCollector01
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
这些是我的两个服务的配置文件:
Service1KafkaConfig
public class KafkaConfig { @Bean public ReceiverOptions<String, String> kafkaReceiverOptions(@Value("${spring.kafka.sourcetopic1}") String topic, KafkaProperties kafkaProperties) { ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties()); return basicReceiverOptions.subscription(Collections.singletonList(topic)); } @Bean public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) { return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions); }
}
服务2配置
public class Service2KafkaConfig { @Bean public ReceiverOptions<String, String> service2KafkaReceiverOptions(@Value("${spring.kafka.sourcetopic3}") String topic, KafkaProperties kafkaProperties) { ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties()); return basicReceiverOptions.subscription(Collections.singletonList(topic)); } @Bean public ReactiveKafkaConsumerTemplate<String, String> service2KafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) { return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions); }
}
我在各自的服务中自动装配这些 bean:
Service1:我没有为 service1 添加 ProcessRecord 方法,因为我觉得这个问题不需要它。如果需要,请告诉我。
@Slf4j
@Service
public class Service1 implements CommandLineRunner {
@Autowired
public ReactiveKafkaConsumerTemplate<String, String> service1KafkaConsumerTemplate;
public Flux<String> consume1() {
return service1KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.donOnNext(s->ProcessRecord(s))
.map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric1[].class.getSimpleName(), metric))
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
@Override
public void run(String... args) throws Exception {
consume1().subscribe();
}
}
服务2:
@Slf4j
@Service
public class Service2 implements CommandLineRunner {
@Autowired
@Qualifier("service2KafkaConsumerTemplate")
public ReactiveKafkaConsumerTemplate<String, String> service2KafkaConsumerTemplate;
public Flux<String> consume2() {
return service2KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), metric))
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
@Override
public void run(String... args) throws Exception {
consume2().subscribe();
}
}
当我运行应用程序时,我只能看到一个开始订阅 topic1 的消费者。是否可以在同一个项目中运行多个 Kafka 消费者。如果是的话,你能告诉我需要做什么才能让它们运行吗?