0

我正在尝试使用 Springboot 和 EmbeddedKafka 为单元测试设置一个类。我有两个主题,topicA 和 topicB,我将测试消息生成到 topicA 和 topicB。

所以这是我的课:

    @EmbeddedKafka()
    @SpringBootTest
    class ApplicationTests {
        private String topicA = "A";
        private String topicB = "B";
    
        @Autowired
        private EmbeddedKafkaBroker embeddedKafkaBroker;
    
        BlockingQueue<ConsumerRecord<String, String>> topicAContent;
        BlockingQueue<ConsumerRecord<String, String>> topicBContent;
    
        KafkaMessageListenerContainer<String, String> container;

        @BeforeEach
        void setup() {
            Map<String, Object> consumerConfigs = new HashMap<>(
                KafkaTestUtils.consumerProps("consumer", "true", embeddedKafkaBroker)
            );

            DefaultKafkaConsumerFactory<String, String> consumerFactory =
                new DefaultKafkaConsumerFactory<>(consumerConfigs, new StringDeserializer(), new StringDeserializer());


            ContainerProperties containerProperties = new ContainerProperties(topicA, topicB);
            container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
            topicAContent = new LinkedBlockingQueue<>();
            topicBContent = new LinkedBlockingQueue<>();

            container.setupMessageListener((MessageListener<String, String>) this::pushRecord);
            container.start();

            ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
        }

        private void pushRecord(ConsumerRecord<String, String> record) {
            String topic = record.topic();
            if(topic.equals(topicA)) {
                topicAContent.add(record);
            }
            else if(topic.equals(topicB)) {
                topicBContent.add(record);
            }
        }

        @Test
        public void produceIntoTopicA() {
            Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
            Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
            producer.send(new ProducerRecord<>(topicA, "a", "Hello A"));
            producer.flush();

            ConsumerRecord<String, String> singleRecord = topicAContent.poll(100, TimeUnit.MILLISECONDS);
            assertThat(singleRecord).isNotNull();
            assertThat(singleRecord.key()).isEqualTo("a");
            assertThat(singleRecord.value()).isEqualTo("Hello A");
        }

        @Test
        public void produceIntoTopicB() {
            Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
            Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
            producer.send(new ProducerRecord<>(topicB, "b", "Hello B"));
            producer.flush();

            ConsumerRecord<String, String> singleRecord = topicBContent.poll(100, TimeUnit.MILLISECONDS);
            assertThat(singleRecord).isNotNull();
            assertThat(singleRecord.key()).isEqualTo("b");
            assertThat(singleRecord.value()).isEqualTo("Hello B");
        }
    }

现在,如果我运行测试,produceIntoTopicB 测试会失败并出现以下错误:

java.lang.IllegalStateException: Expected 1 but got 2 partitions

    at org.springframework.kafka.test.utils.ContainerTestUtils.waitForSingleContainerAssignment(ContainerTestUtils.java:115)
    at org.springframework.kafka.test.utils.ContainerTestUtils.waitForAssignment(ContainerTestUtils.java:51)
    at it.test.ApplicationTests.setup(ApplicationTests.java:92)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:126)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeAllMethod(TimeoutExtension.java:68)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeBeforeAllMethods$9(ClassBasedTestDescriptor.java:384)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeBeforeAllMethods(ClassBasedTestDescriptor.java:382)
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:196)
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:78)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:136)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)

而另一个测试失败并出现此错误:

java.lang.AssertionError: 
Expecting actual not to be null

我哪里错了?

4

1 回答 1

1

对于第二个,你需要

consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

默认情况下,latest因此存在竞争(尽管waitForAssignment()应该阻止这种情况,但请尝试 DEBUG 日志记录)。

对于第一个,编辑问题以显示完整的堆栈跟踪。

于 2021-01-28T17:22:57.243 回答