0

我正在尝试在我公司的项目中使用 spring-integration-kafka-2.1.0.RELEASE。但是,由于下面列出的异常,它不起作用: org.springframework.messaging.MessageDeliveryException:调度程序没有订阅者对于通道'org.springframework.web.context.WebApplicationContext:/order.inputToKafka'。; 嵌套异常是 org.springframework.integration.MessageDispatchingException: Dispatcher 没有订阅者。

xml配置如下:ctx-kafka-producer.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
       http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">

    <int:channel id="inputToKafka"/>

    <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                        kafka-template="kafkaTemplate"
                                        auto-startup="false"
                                        channel="inputToKafka"
                                        topic="customerpos-order"/>


    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg index="0" ref="producerFactory"/>
    </bean>

    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="127.0.0.1:9092"/>
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                <!--<entry key="acks" value="${acks}"/>-->
                <!--<entry key="buffer.memory" value="${buffer.memory}"/>-->
                <!--<entry key="compression.type" value="${compression.type}"/>-->
                <!--<entry key="retries" value="${retries}"/>-->
                <!--<entry key="batch.size" value="${batch.size}"/>-->
                <!--<entry key="max.block.ms" value="${max.block.ms}"/>-->
                <!--<entry key="max.request.size" value="${max.request.size}"/>-->
                <!--<entry key="partitioner.class" value="${partitioner.class}"/>-->
                <!--<entry key="receive.buffer.bytes" value="${receive.buffer.bytes}"/>-->
                <!--<entry key="request.timeout.ms" value="${request.timeout.ms}"/>-->
                <!--<entry key="security.protocol" value="${security.protocol}"/>-->
                <entry key="send.buffer.bytes" value="${send.buffer.bytes}"/>
                <!--<entry key="ssl.protocol" value="${ssl.protocol}"/>-->
                <!--<entry key="ssl.truststore.type" value="${ssl.truststore.type}"/>-->
                <!--<entry key="timeout.ms" value="${timeout.ms}"/>-->
                <!--<entry key="block.on.buffer.full" value="${block.on.buffer.full}"/>-->
                <!--<entry key="max.in.flight.requests.per.connection" value="${max.in.flight.requests.per.connection}"/>-->
                <!--<entry key="metadata.fetch.timeout.ms" value="${metadata.fetch.timeout.ms}"/>-->
                <!--<entry key="metadata.max.age.ms" value="${metadata.max.age.ms}"/>-->
                <!--<entry key="metrics.num.samples" value="${metrics.num.samples}"/>-->
                <!--<entry key="metrics.sample.window.ms" value="${metrics.sample.window.ms}"/>-->
                <!--<entry key="reconnect.backoff.ms" value="${reconnect.backoff.ms}"/>-->
                <!--<entry key="retry.backoff.ms" value="${retry.backoff.ms}"/>-->
                <!--<entry key="sasl.kerberos.min.time.before.relogin" value="${sasl.kerberos.min.time.before.relogin}"/>-->
                <!--<entry key="sasl.kerberos.ticket.renew.jitter" value="${sasl.kerberos.ticket.renew.jitter}"/>-->
                <!--<entry key="sasl.kerberos.ticket.renew.window.factor" value="${sasl.kerberos.ticket.renew.window.factor}"/>-->
                <!--<entry key="ssl.keymanager.algorithm" value="${ssl.keymanager.algorithm}"/>-->
                <!--<entry key="linger.ms" value="1"/>-->
            </map>
        </constructor-arg>
    </bean>

    <!--<bean id="kafkaProducer" class="com.angho.cloud.manager.kafka.impl.KafkaProducerImpl"/>-->
</beans>

ctx-kafka-consumer.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">

    <int-kafka:message-driven-channel-adapter id="kafkaListener"
                                              listener-container="container1"
                                              auto-startup="true"
                                              phase="100"
                                              send-timeout="5000"
                                              channel="inputFromChannel"
                                              error-channel="errorChannel"/>

    <int:service-activator input-channel="inputFromChannel" ref="kafkaOrderConsumer"/>

    <!--<bean id="kafkaOrderConsumer" class="com.angho.cloud.manager.kafka.KafkaOrderConsumer"/>-->

    <bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
        <constructor-arg>
            <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
                <constructor-arg>
                    <map>
                        <entry key="bootstrap.servers" value="127.0.0.1:9092"/>
                        <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                        <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                        <!--<entry key="fetch.min.bytes" value="${fetch.min.bytes}"/>-->
                        <entry key="group.id" value="customerpos-order"/>
                        <!--<entry key="heartbeat.interval.ms" value="${heartbeat.interval.ms}"/>-->
                        <!--<entry key="max.partition.fetch.bytes" value="${max.partition.fetch.bytes}"/>-->
                        <entry key="session.timeout.ms" value="15000"/>
                        <!--<entry key="auto.offset.reset" value="${auto.offset.reset}"/>-->
                        <!--<entry key="connections.max.idle.ms" value="${connections.max.idle.ms}"/>-->
                        <entry key="enable.auto.commit" value="false"/>
                        <!--<entry key="exclude.internal.topics" value="${exclude.internal.topics}"/>-->
                        <!--<entry key="fetch.max.bytes" value="${fetch.max.bytes}"/>-->
                        <!--<entry key="max.poll.interval.ms" value="${max.poll.interval.ms}"/>-->
                        <!--<entry key="max.poll.records" value="${max.poll.records}"/>-->
                        <!--<entry key="partition.assignment.strategy" value="${partition.assignment.strategy}"/>-->
                        <!--<entry key="receive.buffer.bytes" value="${receive.buffer.bytes}"/>-->
                        <!--<entry key="request.timeout.ms" value="${request.timeout.ms}"/>-->
                        <entry key="auto.commit.interval.ms" value="100"/>
                        <!--<entry key="fetch.max.wait.ms" value="${fetch.max.wait.ms}"/>-->
                        <!--<entry key="metadata.max.age.ms" value="${metadata.max.age.ms}"/>-->
                        <!--<entry key="metrics.num.samples" value="${metrics.num.samples}"/>-->
                        <!--<entry key="metrics.recording.level" value="${metrics.recording.level}"/>-->
                        <!--<entry key="metrics.sample.window.ms" value="${metrics.sample.window.ms}"/>-->
                        <!--<entry key="reconnect.backoff.ms" value="${reconnect.backoff.ms}"/>-->
                        <!--<entry key="retry.backoff.ms" value="${retry.backoff.ms}"/>-->
                    </map>
                </constructor-arg>
            </bean>
        </constructor-arg>
        <constructor-arg>
            <bean class="org.springframework.kafka.listener.config.ContainerProperties">
                <constructor-arg name="topics" value="customerpos-order"/>
            </bean>
        </constructor-arg>
    </bean>

</beans> 

消费者通过使用依赖属性在生产者之前初始化。

Java代码如下:

制片人:

package com.angho.cloud.manager.kafka.impl;

import com.angho.cloud.bo.data.order.SyncOrderBO;
import com.angho.cloud.bo.result.order.CPosOrderSyncResult;
import com.angho.cloud.manager.kafka.CPosOrderSyncManager;
import com.angho.data.common.ResultConstant;
import org.apache.log4j.Logger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;


import javax.annotation.PostConstruct;
import javax.annotation.Resource;

public class CPosOrderKafkaProducer implements CPosOrderSyncManager {

    private static final Logger LOG = Logger.getLogger(CPosOrderKafkaProducer.class);

    @PostConstruct
    public void init(){
        System.out.println("KafkaProducer start...");
    }

    @Resource(name = "inputToKafka")
    private MessageChannel messageChannel;

    @Override
    public CPosOrderSyncResult sendToKafka(SyncOrderBO order) {
        CPosOrderSyncResult result = new CPosOrderSyncResult();
        result.setStatus(ResultConstant.CODE.ERROR);

        Message<SyncOrderBO> message = new GenericMessage<>(order);

        try {
            boolean flag = this.messageChannel.send(message);
            if (flag) {
                result.setStatus(ResultConstant.CODE.SUCCESS);
                result.setMessage(ResultConstant.MESSAGE.DEFAULT_SUCCESS_MESSAGE);
            } else {
                result.setMessage("Failed to send message to Kafka.");
            }
        } catch (Exception ex) {
            LOG.error(ex);
            result.setException(ex);
        }

        return result;
    }
}

生产者通过以下方式定义为 bean:

<bean id="kafkaOrderProducer" class="com.angho.cloud.manager.kafka.impl.CPosOrderKafkaProducer" depends-on="kafkaOrderConsumer"/>

消费者 Java 代码:

package com.angho.cloud.manager.kafka;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component("kafkaOrderConsumer")
public class KafkaOrderConsumer {

    @PostConstruct
    public void init(){
        System.out.println("what?KafkaConsumer start...");
    }

    @SuppressWarnings("unchecked")
    @ServiceActivator
    public void process(Message<?> message){
        System.out.println("Message =======>" + message);
        System.out.println("Content =======>" + message.getPayload());
    }
}

我不知道发生异常的原因。

我应该怎么做才能让它工作?

PS:我正在尝试通过xml而不是@Bean之类的JavaCode来配置它。

原谅我糟糕的英语TAT
…………

非常感谢..

4

1 回答 1

0

auto-startup="false"

你为什么有这个?

start()适配器在ed之前不会订阅频道。

于 2017-09-11T13:33:57.683 回答