我很难理解 Java Spring Boot 中的一些 Kafka 概念。我想针对在服务器上运行的真实 Kafka 代理测试消费者,该服务器有一些生产者写入/已经将数据写入各种主题。我想与服务器建立连接,使用数据,并在测试中验证或处理其内容。
互联网上的绝大多数示例(实际上是到目前为止我所看到的所有示例)都涉及嵌入式 kafka、EmbeddedKafkaBroker,并显示了在本地一台机器上实现的生产者和消费者。我还没有找到任何可以解释如何与远程 kafka 服务器建立连接并从特定主题读取数据的示例。我已经编写了一些代码,并打印了经纪人地址:
System.out.println(embeddedKafkaBroker.getBrokerAddress(0));
我得到的是127.0.0.1:9092,表示是本地的,所以与远程服务器的连接还没有建立。
另一方面,当我运行 SpringBootApplication 时,我从远程代理获取有效负载。
接收者:
@Component
public class Receiver {
private static final String TOPIC_NAME = "X";
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = TOPIC_NAME)
public void receive(final byte[] payload) {
LOGGER.info("received the following payload: '{}'", payload);
latch.countDown();
}
}
配置:
@EnableKafka
@Configuration
public class ByteReceiverConfig {
@Autowired
EmbeddedKafkaBroker kafkaEmbeded;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupIdConfig;
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
ConsumerFactory<Object, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}
@Bean
Map<String, Object> consumerProperties() {
final Map<String, Object> properties =
KafkaTestUtils.consumerProps("junit-test", "true", this.kafkaEmbeded);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
return properties;
}
测试:
@EnableAutoConfiguration
@EnableKafka
@SpringBootTest(classes = {ByteReceiverConfig.class, Receiver.class})
@EmbeddedKafka
@ContextConfiguration(classes = ByteReceiverConfig.class)
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.group-id=EmbeddedKafkaTest"})
public class KafkaTest {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
Receiver receiver;
@BeforeEach
void waitForAssignment() {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
System.out.println(messageListenerContainer.getAssignedPartitions().isEmpty());
System.out.println(messageListenerContainer.toString());
System.out.println(embeddedKafkaBroker.getTopics().size());
System.out.println(embeddedKafkaBroker.getPartitionsPerTopic());
System.out.println(embeddedKafkaBroker.getBrokerAddress(0));
System.out.println(embeddedKafkaBroker.getBrokersAsString());
ContainerTestUtils.waitForAssignment(messageListenerContainer,
embeddedKafkaBroker.getPartitionsPerTopic());
}
@Test
public void testReceive() {
}
}
我希望有人对以下问题有所了解:
1.EmbeddedKafkaBroker 类的实例能否用于测试来自远程代理的数据,还是仅用于本地测试,我将在本地测试中生成数据,即将数据发送到我自己创建的主题并使用数据?
2.是否可以为真正的kafka服务器编写测试类?例如,验证是否已建立连接,或者是否已从特定主题读取数据。在这种情况下需要哪些注释、配置和类?
3.如果我只想消费数据,是否必须在配置文件中提供生产者配置(这会很奇怪,但到目前为止我遇到的所有示例都是这样做的)?
4.您是否知道任何资源(书籍、网站等)显示使用 kafka 的真实示例,即与远程 kafka 服务器、仅与生产者或消费者一起使用?