当我尝试在我的 xd 模块中构建一个 Kafka 消费者时,我收到了以下异常:
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) ~[na:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557) ~[na:na]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:73) ~[na:na]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:69) ~[na:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:284) ~[na:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:222) ~[na:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:179) ~[na:na]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:204) ~[na:na]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173) ~[spring-context-4.1.7.RELEASE.jar:4.1.7.RELEASE]
... 27 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.clients.consumer.RangeAssignor ClassNotFoundException exception occured
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:227) ~[na:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637) ~[na:na]
... 35 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.RangeAssignor
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_77]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_77]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[na:1.8.0_77]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_77]
at java.lang.Class.forName0(Native Method) ~[na:1.8.0_77]
at java.lang.Class.forName(Class.java:348) ~[na:1.8.0_77]
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:332) ~[na:na]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:225)
这是我的xml:
<int-kafka:message-driven-channel-adapter
id="kafkaListener" listener-container="customKafkaMessageListenerContainer"
auto-startup="false" phase="100" send-timeout="5000" channel="toTransformer"
error-channel="errorChannel"/>
<bean id="containerProps"
class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topicPartitions" ref="topicPartitionInitialOffset" />
</bean>
<bean id="topicPartitionInitialOffset"
class="org.springframework.kafka.support.TopicPartitionInitialOffset">
<constructor-arg index="0" name="topic" value="source-8-26"
type="java.lang.String" />
<constructor-arg index="1" name="partition" value="0"
type="int" />
<constructor-arg index="2" name="initialOffset" value="0"
type="java.lang.Long" />
</bean>
<bean id="consumerFactory"
class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="group.id" value="test-consumer-group" />
<entry key="autocommit.enable" value="false" />
<entry key="value.deserializer" value-type="java.lang.Class" value="org.apache.kafka.common.serialization.StringSerializer">
</entry>
<entry key="key.deserializer" value-type="java.lang.Class" value="org.apache.kafka.common.serialization.StringSerializer">
</entry>
</map>
</constructor-arg>
</bean>
<bean id="customKafkaMessageListenerContainer"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg name="consumerFactory" ref="consumerFactory">
</constructor-arg>
<constructor-arg name="containerProperties" ref="containerProps" />
</bean>
依赖项:
<dependencies>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.3.RELEASE</version>
</dependency>
</dependencies>
有谁知道为什么我得到这个类没有发现异常?我检查了 Kafka 0.10.0 的 API 并且该类存在,所以我不确定为什么会收到此异常。这可能与 Spring XD 加载类的方式有关吗?我知道 Spring XD 在其 lib 文件夹中附带了 Kafka 0.8.2,所以也许 Spring XD 先加载 Kafka 0.8.2,然后它因此无法找到该类?
任何见解将不胜感激!