3

我很难理解 Java Spring Boot 中的一些 Kafka 概念。我想针对在服务器上运行的真实 Kafka 代理测试消费者,该服务器有一些生产者写入/已经将数据写入各种主题。我想与服务器建立连接,使用数据,并在测试中验证或处理其内容。

互联网上的绝大多数示例(实际上是到目前为止我所看到的所有示例)都涉及嵌入式 kafka、EmbeddedKafkaBroker,并显示了在本地一台机器上实现的生产者和消费者。我还没有找到任何可以解释如何与远程 kafka 服务器建立连接并从特定主题读取数据的示例。我已经编写了一些代码,并打印了经纪人地址:

System.out.println(embeddedKafkaBroker.getBrokerAddress(0));

我得到的是127.0.0.1:9092,表示是本地的,所以与远程服务器的连接还没有建立。

另一方面,当我运行 SpringBootApplication 时,我从远程代理获取有效负载。

接收者:

@Component
public class Receiver {

private static final String TOPIC_NAME = "X";

private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

private CountDownLatch latch = new CountDownLatch(1);

public CountDownLatch getLatch() {
    return latch;
}

@KafkaListener(topics = TOPIC_NAME)
public void receive(final byte[] payload) {
    LOGGER.info("received the following payload: '{}'", payload);
    latch.countDown();
}
}

配置:

    @EnableKafka
    @Configuration
    public class ByteReceiverConfig {

        @Autowired
        EmbeddedKafkaBroker kafkaEmbeded;

        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;

        @Value("${spring.kafka.consumer.group-id}")
        private String groupIdConfig;

        @Bean
        public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
            final ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }

        @Bean
        ConsumerFactory<Object, Object> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerProperties());
        }

        @Bean
        Map<String, Object> consumerProperties() {
            final Map<String, Object> properties =
                    KafkaTestUtils.consumerProps("junit-test", "true", this.kafkaEmbeded);
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
            return properties;
        }

测试:

        @EnableAutoConfiguration
        @EnableKafka
        @SpringBootTest(classes = {ByteReceiverConfig.class, Receiver.class})
        @EmbeddedKafka
        @ContextConfiguration(classes = ByteReceiverConfig.class)
        @TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
                "spring.kafka.consumer.group-id=EmbeddedKafkaTest"})
        public class KafkaTest {


            @Autowired
            private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

            @Autowired
            EmbeddedKafkaBroker embeddedKafkaBroker;


            @Autowired
            Receiver receiver;

            @BeforeEach
            void waitForAssignment() {
                for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
                    System.out.println(messageListenerContainer.getAssignedPartitions().isEmpty());
                    System.out.println(messageListenerContainer.toString());
                    System.out.println(embeddedKafkaBroker.getTopics().size());
                    System.out.println(embeddedKafkaBroker.getPartitionsPerTopic());
                    System.out.println(embeddedKafkaBroker.getBrokerAddress(0));
                    System.out.println(embeddedKafkaBroker.getBrokersAsString());

                    ContainerTestUtils.waitForAssignment(messageListenerContainer,
                            embeddedKafkaBroker.getPartitionsPerTopic());
            }

            @Test
            public void testReceive() {

            }
        }

我希望有人对以下问题有所了解:

1.EmbeddedKafkaBroker 类的实例能否用于测试来自远程代理的数据,还是仅用于本地测试,我将在本地测试中生成数据,即将数据发送到我自己创建的主题并使用数据?

2.是否可以为真正的kafka服务器编写测试类?例如,验证是否已建立连接,或者是否已从特定主题读取数据。在这种情况下需要哪些注释、配置和类?

3.如果我只想消费数据,是否必须在配置文件中提供生产者配置(这会很奇怪,但到目前为止我遇到的所有示例都是这样做的)?

4.您是否知道任何资源(书籍、网站等)显示使用 kafka 的真实示例,即与远程 kafka 服务器、仅与生产者或消费者一起使用?

4

2 回答 2

1
  1. 如果您只想与外部代理交谈,则根本不需要嵌入式代理。

  2. 是的,只需适当地设置引导服务器属性。

  3. 不,您不需要生产者配置。

编辑

@SpringBootApplication
public class So56044105Application {

    public static void main(String[] args) {
        SpringApplication.run(So56044105Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so56044105", 1, (short) 1);
    }

}
spring.kafka.bootstrap-servers=10.0.0.8:9092
spring.kafka.consumer.enable-auto-commit=false
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { So56044105Application.class, So56044105ApplicationTests.Config.class })
public class So56044105ApplicationTests {

    @Autowired
    public Config config;

    @Test
    public void test() throws InterruptedException {
        assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(config.received.get(0)).isEqualTo("foo");
    }

    @Configuration
    public static class Config implements ConsumerSeekAware {

        List<String> received = new ArrayList<>();

        CountDownLatch latch = new CountDownLatch(3);

        @KafkaListener(id = "so56044105", topics = "so56044105")
        public void listen(String in) {
            System.out.println(in);
            this.received.add(in);
            this.latch.countDown();
        }

        @Override
        public void registerSeekCallback(ConsumerSeekCallback callback) {
        }

        @Override
        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
            System.out.println("Seeking to beginning");
            assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
        }

        @Override
        public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        }

    }

}
于 2019-05-08T15:36:13.680 回答
0

这个存储库中有一些示例用于跨各种配置引导真正的 Kafka 生产者和消费者——明文、SSL、有和没有身份验证等。

注意:上面的 repo 包含我是有效 Kafka书的示例。然而,它们可以在没有这本书的情况下自由使用,并且希望它们本身也同样有意义。

更重要的是,这里有一对基本生产者和消费者的例子。

/** A sample Kafka producer. */
import static java.lang.System.*;

import java.util.*;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.*;

public final class BasicProducerSample {
  public static void main(String[] args) throws InterruptedException {
    final var topic = "getting-started";

    final Map<String, Object> config = 
        Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092", 
               ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), 
               ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), 
               ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

    try (var producer = new KafkaProducer<String, String>(config)) {
      while (true) {
        final var key = "myKey";
        final var value = new Date().toString();
        out.format("Publishing record with value %s%n", 
                   value);

        final Callback callback = (metadata, exception) -> {
          out.format("Published with metadata: %s, error: %s%n", 
                     metadata, exception);
        };

        // publish the record, handling the metadata in the callback
        producer.send(new ProducerRecord<>(topic, key, value), callback);

        // wait a second before publishing another
        Thread.sleep(1000);
      }
    }
  }
}
/** A sample Kafka consumer. */

import static java.lang.System.*;

import java.time.*;
import java.util.*;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.*;

public final class BasicConsumerSample {
  public static void main(String[] args) {
    final var topic = "getting-started";

    final Map<String, Object> config = 
        Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092", 
               ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), 
               ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), 
               ConsumerConfig.GROUP_ID_CONFIG, "basic-consumer-sample",
               ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
               ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

    try (var consumer = new KafkaConsumer<String, String>(config)) {
      consumer.subscribe(Set.of(topic));

      while (true) {
        final var records = consumer.poll(Duration.ofMillis(100));
        for (var record : records) {
          out.format("Got record with value %s%n", record.value());
        }
        consumer.commitAsync();
      }
    }
  }
}

现在,这些显然不是单元测试。但只需很少的返工,它们就可以合二为一。下一步是删除Thread.sleep()和添加断言。请注意,由于 Kafka 本质上是异步的,因此在发布后立即在消费者中天真地断言已发布的消息将失败。对于健壮的、可重复的测试,您可能需要使用Timesert 之类的东西。

于 2021-01-17T21:53:11.813 回答