2

我有一个工作的 spring-boot 基于 java-gradle 的服务,可以生成和使用 Kafka 消息。但是我无法使用带有 @EmbeddedKafka 注释的 spring-kafka-test 库或使用 @ClassRule 方式创建集成测试。在这两种方式中,我最终都会遇到相同的错误(关于 Scala 如下所示)。如果有人对幕后可能发生的事情有任何线索,那将非常有帮助。

Spring Boot 版本:2.1.6.RELEASE Spring Kafka 版本:2.2.7.RELEASE

生产者配置:

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    private String bootstrapAddress;

    public KafkaProducerConfig(@Value("${spring.kafka.bootstrap-servers}") String bootstrapAddress) {
        this.bootstrapAddress = bootstrapAddress;
    }

    @Bean
    public ProducerFactory<String, Greeting> greetingProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
        return new KafkaTemplate<>(greetingProducerFactory());
    }
}

生产者代码:

@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Greeting> kafkaTemplate;

    @Value(value = "${kiosk.kafka.topic.greeting}")
    private String greetingTopicName;

    public void sendMessage(Greeting greeting) {

        ListenableFuture<SendResult<String, Greeting>> future = kafkaTemplate.send(greetingTopicName, greeting);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Greeting>>() {

            @Override
            public void onSuccess(SendResult<String, Greeting> result) {
                System.out.println("Sent greeting=[" + greeting + "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("Unable to send greeting=[" + greeting + "] due to : " + ex.getMessage());
            }
        });
    }
}

属性文件:

spring.kafka.consumer.auto-offset-reset=earliest
spring.embedded.kafka.brokers=localhost:9092
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.consumer.group-id=1

我在开始测试时尝试了两种方法和相同的错误:

测试类规则:

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class PKafkaProducerClassRuleTest {

    private static final Logger LOGGER =
        LoggerFactory.getLogger(PKafkaProducerClassRuleTest.class);

    private static String SENDER_TOPIC = "sender.t";

    static {
        System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
    }

    @Autowired
    private PinPadKafkaProducer sender;

    private KafkaMessageListenerContainer<String, String> container;

    private BlockingQueue<ConsumerRecord<String, String>> records;

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka =
        new EmbeddedKafkaRule(1, true, SENDER_TOPIC);

使用 EmbeddedKafka 进行测试:

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1,
    topics = {
        "sender.t" })
public class PKafkaProducerEmbeddedKafkaTest {

    private static final Logger LOGGER =
        LoggerFactory.getLogger(PKafkaProducerEmbeddedKafkaTest.class);

    private static String SENDER_TOPIC = "sender.t";

    static {
        System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY,
            "spring.kafka.bootstrap-servers");
    }

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Autowired
    private PinPadKafkaProducer sender;

    @Test
    public void someTest() {
        sender.sendMessage(new Greeting("msgtest", "nametest"));
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, SENDER_TOPIC);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

}

两种情况下的错误:

19/11/2019 13:51:56.774+0100 ERROR o.s.t.c.TestContextManager [] - Caught exception while allowing TestExecutionListener [org.springframework.test.context.web.ServletTestExecutionListener@e9cee0d] to prepare test instance [com.goldcar.kiosk.PKafkaProducerEmbeddedKafkaTest@39271181]
java.lang.IllegalStateException: Failed to load ApplicationContext
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:125)
    at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:108)
    at org.springframework.test.context.web.ServletTestExecutionListener.setUpRequestContextIfNecessary(ServletTestExecutionListener.java:190)
    at org.springframework.test.context.web.ServletTestExecutionListener.prepareTestInstance(ServletTestExecutionListener.java:132)
    at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:246)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:227)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:246)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
    at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
    at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
    at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1778)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:405)
    at org.springframework.kafka.test.context.EmbeddedKafkaContextCustomizer.customizeContext(EmbeddedKafkaContextCustomizer.java:109)
    at org.springframework.boot.test.context.SpringBootContextLoader$ContextCustomizerAdapter.initialize(SpringBootContextLoader.java:300)
    at org.springframework.boot.SpringApplication.applyInitializers(SpringApplication.java:621)
    at org.springframework.boot.SpringApplication.prepareContext(SpringApplication.java:365)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:310)
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:119)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
    ... 49 common frames omitted
Caused by: java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
    at kafka.cluster.EndPoint$.<init>(EndPoint.scala:32)
    at kafka.cluster.EndPoint$.<clinit>(EndPoint.scala)
    at kafka.server.Defaults$.<init>(KafkaConfig.scala:68)
    at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
    at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:781)
    at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
    at kafka.utils.TestUtils$.createBrokerConfig(TestUtils.scala:234)
    at kafka.utils.TestUtils.createBrokerConfig(TestUtils.scala)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.createBrokerProperties(EmbeddedKafkaBroker.java:239)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:214)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1837)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1774)
    ... 58 common frames omitted
4

2 回答 2

2

看来您在类路径上的 kafka jar 版本不匹配。

如果您使用与kafka-clientsBoot 默认设置不同的版本,则会发生这种情况。

有关如何覆盖所有 kafka jar 版本的示例,请参阅此附录。

当你使用spring-kafka-test(版本 2.2.x)和 2.1.x kafka-clientsjar 时,你需要重写某些传递依赖,如下:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring.kafka.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>${spring.kafka.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
        </exclusion>
    </exclusions>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.1</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.1</version>
    <classifier>test</classifier>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.1</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.1</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

请注意,当切换到 scala 2.12(推荐用于 2.1.x 及更高版本)时,必须将 2.11 版本排除在spring-kafka-test.

于 2019-11-19T14:56:52.680 回答
2

在 gradle 中使用@Gary Russell 答案应用的解决方案:

gradle.properties:

spring_kafka_test_version=2.2.7.RELEASE
kafka_test_version=2.1.1
kafka_clients_test_version=2.1.1

构建.gradle:

implementation group: 'org.springframework.kafka', name: 'spring-kafka'
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafka_clients_test_version
implementation group: 'org.apache.kafka', name: 'kafka_2.12', version: kafka_test_version

testImplementation (group: 'org.springframework.kafka', name: 'spring-kafka-test', version: spring_kafka_test_version) {
    exclude module: 'kafka_2.11'
}
testImplementation group: 'org.apache.kafka', name: 'kafka_2.12', version: kafka_test_version, classifier: 'test'
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafka_clients_test_version
于 2019-11-19T16:14:41.227 回答