0

当我尝试在我的 xd 模块中构建一个 Kafka 消费者时,我收到了以下异常:

    Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) ~[na:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557) ~[na:na]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:73) ~[na:na]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:69) ~[na:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:284) ~[na:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:222) ~[na:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:179) ~[na:na]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:204) ~[na:na]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173) ~[spring-context-4.1.7.RELEASE.jar:4.1.7.RELEASE]
... 27 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.clients.consumer.RangeAssignor ClassNotFoundException exception occured
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:227) ~[na:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637) ~[na:na]
... 35 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.RangeAssignor
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_77]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_77]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[na:1.8.0_77]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_77]
at java.lang.Class.forName0(Native Method) ~[na:1.8.0_77]
at java.lang.Class.forName(Class.java:348) ~[na:1.8.0_77]
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:332) ~[na:na]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:225)

这是我的xml:

<int-kafka:message-driven-channel-adapter
    id="kafkaListener" listener-container="customKafkaMessageListenerContainer"
    auto-startup="false" phase="100" send-timeout="5000" channel="toTransformer"
    error-channel="errorChannel"/>

<bean id="containerProps"
    class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg name="topicPartitions" ref="topicPartitionInitialOffset" />
</bean>

<bean id="topicPartitionInitialOffset"
    class="org.springframework.kafka.support.TopicPartitionInitialOffset">
    <constructor-arg index="0" name="topic" value="source-8-26"
        type="java.lang.String" />
    <constructor-arg index="1" name="partition" value="0"
        type="int" />
    <constructor-arg index="2" name="initialOffset" value="0"
        type="java.lang.Long" />
</bean>

<bean id="consumerFactory"
    class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
        <map>
            <entry key="bootstrap.servers" value="localhost:9092" />
            <entry key="group.id" value="test-consumer-group" />
            <entry key="autocommit.enable" value="false" />
            <entry key="value.deserializer" value-type="java.lang.Class" value="org.apache.kafka.common.serialization.StringSerializer">
            </entry>
            <entry key="key.deserializer" value-type="java.lang.Class" value="org.apache.kafka.common.serialization.StringSerializer">
            </entry>
        </map>
    </constructor-arg>
</bean>

<bean id="customKafkaMessageListenerContainer"
    class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg name="consumerFactory" ref="consumerFactory">
    </constructor-arg>
    <constructor-arg name="containerProperties" ref="containerProps" />
</bean>

依赖项:

<dependencies>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-kafka</artifactId>
        <version>2.0.1.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>1.0.3.RELEASE</version>
    </dependency>
</dependencies>

有谁知道为什么我得到这个类没有发现异常?我检查了 Kafka 0.10.0 的 API 并且该类存在,所以我不确定为什么会收到此异常。这可能与 Spring XD 加载类的方式有关吗?我知道 Spring XD 在其 lib 文件夹中附带了 Kafka 0.8.2,所以也许 Spring XD 先加载 Kafka 0.8.2,然后它因此无法找到该类?

任何见解将不胜感激!

4

1 回答 1

0

我希望它可以正常工作,因为模块加载在它自己的类加载器中,它应该从模块的 lib 文件夹中找到正确的类。

-verbose我通常通过运行记录所有类加载的 JVM 来调试此类问题。

注意:如果您使用 Kafka 进行 XD 传输,Spring XD 仅支持 0.8.xx 客户端 - 后续项目(Spring Cloud Stream/Spring Cloud Dataflow)将支持较新的 kafka 客户端。支持新客户的版本 1.1的第一个里程碑于上周宣布

于 2016-08-29T13:33:21.980 回答