我正在尝试在我公司的项目中使用 spring-integration-kafka-2.1.0.RELEASE。但是,由于下面列出的异常,它不起作用: org.springframework.messaging.MessageDeliveryException:调度程序没有订阅者对于通道'org.springframework.web.context.WebApplicationContext:/order.inputToKafka'。; 嵌套异常是 org.springframework.integration.MessageDispatchingException: Dispatcher 没有订阅者。
xml配置如下:ctx-kafka-producer.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
<int:channel id="inputToKafka"/>
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="kafkaTemplate"
auto-startup="false"
channel="inputToKafka"
topic="customerpos-order"/>
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg index="0" ref="producerFactory"/>
</bean>
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="127.0.0.1:9092"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<!--<entry key="acks" value="${acks}"/>-->
<!--<entry key="buffer.memory" value="${buffer.memory}"/>-->
<!--<entry key="compression.type" value="${compression.type}"/>-->
<!--<entry key="retries" value="${retries}"/>-->
<!--<entry key="batch.size" value="${batch.size}"/>-->
<!--<entry key="max.block.ms" value="${max.block.ms}"/>-->
<!--<entry key="max.request.size" value="${max.request.size}"/>-->
<!--<entry key="partitioner.class" value="${partitioner.class}"/>-->
<!--<entry key="receive.buffer.bytes" value="${receive.buffer.bytes}"/>-->
<!--<entry key="request.timeout.ms" value="${request.timeout.ms}"/>-->
<!--<entry key="security.protocol" value="${security.protocol}"/>-->
<entry key="send.buffer.bytes" value="${send.buffer.bytes}"/>
<!--<entry key="ssl.protocol" value="${ssl.protocol}"/>-->
<!--<entry key="ssl.truststore.type" value="${ssl.truststore.type}"/>-->
<!--<entry key="timeout.ms" value="${timeout.ms}"/>-->
<!--<entry key="block.on.buffer.full" value="${block.on.buffer.full}"/>-->
<!--<entry key="max.in.flight.requests.per.connection" value="${max.in.flight.requests.per.connection}"/>-->
<!--<entry key="metadata.fetch.timeout.ms" value="${metadata.fetch.timeout.ms}"/>-->
<!--<entry key="metadata.max.age.ms" value="${metadata.max.age.ms}"/>-->
<!--<entry key="metrics.num.samples" value="${metrics.num.samples}"/>-->
<!--<entry key="metrics.sample.window.ms" value="${metrics.sample.window.ms}"/>-->
<!--<entry key="reconnect.backoff.ms" value="${reconnect.backoff.ms}"/>-->
<!--<entry key="retry.backoff.ms" value="${retry.backoff.ms}"/>-->
<!--<entry key="sasl.kerberos.min.time.before.relogin" value="${sasl.kerberos.min.time.before.relogin}"/>-->
<!--<entry key="sasl.kerberos.ticket.renew.jitter" value="${sasl.kerberos.ticket.renew.jitter}"/>-->
<!--<entry key="sasl.kerberos.ticket.renew.window.factor" value="${sasl.kerberos.ticket.renew.window.factor}"/>-->
<!--<entry key="ssl.keymanager.algorithm" value="${ssl.keymanager.algorithm}"/>-->
<!--<entry key="linger.ms" value="1"/>-->
</map>
</constructor-arg>
</bean>
<!--<bean id="kafkaProducer" class="com.angho.cloud.manager.kafka.impl.KafkaProducerImpl"/>-->
</beans>
ctx-kafka-consumer.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
<int-kafka:message-driven-channel-adapter id="kafkaListener"
listener-container="container1"
auto-startup="true"
phase="100"
send-timeout="5000"
channel="inputFromChannel"
error-channel="errorChannel"/>
<int:service-activator input-channel="inputFromChannel" ref="kafkaOrderConsumer"/>
<!--<bean id="kafkaOrderConsumer" class="com.angho.cloud.manager.kafka.KafkaOrderConsumer"/>-->
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="127.0.0.1:9092"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<!--<entry key="fetch.min.bytes" value="${fetch.min.bytes}"/>-->
<entry key="group.id" value="customerpos-order"/>
<!--<entry key="heartbeat.interval.ms" value="${heartbeat.interval.ms}"/>-->
<!--<entry key="max.partition.fetch.bytes" value="${max.partition.fetch.bytes}"/>-->
<entry key="session.timeout.ms" value="15000"/>
<!--<entry key="auto.offset.reset" value="${auto.offset.reset}"/>-->
<!--<entry key="connections.max.idle.ms" value="${connections.max.idle.ms}"/>-->
<entry key="enable.auto.commit" value="false"/>
<!--<entry key="exclude.internal.topics" value="${exclude.internal.topics}"/>-->
<!--<entry key="fetch.max.bytes" value="${fetch.max.bytes}"/>-->
<!--<entry key="max.poll.interval.ms" value="${max.poll.interval.ms}"/>-->
<!--<entry key="max.poll.records" value="${max.poll.records}"/>-->
<!--<entry key="partition.assignment.strategy" value="${partition.assignment.strategy}"/>-->
<!--<entry key="receive.buffer.bytes" value="${receive.buffer.bytes}"/>-->
<!--<entry key="request.timeout.ms" value="${request.timeout.ms}"/>-->
<entry key="auto.commit.interval.ms" value="100"/>
<!--<entry key="fetch.max.wait.ms" value="${fetch.max.wait.ms}"/>-->
<!--<entry key="metadata.max.age.ms" value="${metadata.max.age.ms}"/>-->
<!--<entry key="metrics.num.samples" value="${metrics.num.samples}"/>-->
<!--<entry key="metrics.recording.level" value="${metrics.recording.level}"/>-->
<!--<entry key="metrics.sample.window.ms" value="${metrics.sample.window.ms}"/>-->
<!--<entry key="reconnect.backoff.ms" value="${reconnect.backoff.ms}"/>-->
<!--<entry key="retry.backoff.ms" value="${retry.backoff.ms}"/>-->
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="customerpos-order"/>
</bean>
</constructor-arg>
</bean>
</beans>
消费者通过使用依赖属性在生产者之前初始化。
Java代码如下:
制片人:
package com.angho.cloud.manager.kafka.impl;
import com.angho.cloud.bo.data.order.SyncOrderBO;
import com.angho.cloud.bo.result.order.CPosOrderSyncResult;
import com.angho.cloud.manager.kafka.CPosOrderSyncManager;
import com.angho.data.common.ResultConstant;
import org.apache.log4j.Logger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
public class CPosOrderKafkaProducer implements CPosOrderSyncManager {
private static final Logger LOG = Logger.getLogger(CPosOrderKafkaProducer.class);
@PostConstruct
public void init(){
System.out.println("KafkaProducer start...");
}
@Resource(name = "inputToKafka")
private MessageChannel messageChannel;
@Override
public CPosOrderSyncResult sendToKafka(SyncOrderBO order) {
CPosOrderSyncResult result = new CPosOrderSyncResult();
result.setStatus(ResultConstant.CODE.ERROR);
Message<SyncOrderBO> message = new GenericMessage<>(order);
try {
boolean flag = this.messageChannel.send(message);
if (flag) {
result.setStatus(ResultConstant.CODE.SUCCESS);
result.setMessage(ResultConstant.MESSAGE.DEFAULT_SUCCESS_MESSAGE);
} else {
result.setMessage("Failed to send message to Kafka.");
}
} catch (Exception ex) {
LOG.error(ex);
result.setException(ex);
}
return result;
}
}
生产者通过以下方式定义为 bean:
<bean id="kafkaOrderProducer" class="com.angho.cloud.manager.kafka.impl.CPosOrderKafkaProducer" depends-on="kafkaOrderConsumer"/>
消费者 Java 代码:
package com.angho.cloud.manager.kafka;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component("kafkaOrderConsumer")
public class KafkaOrderConsumer {
@PostConstruct
public void init(){
System.out.println("what?KafkaConsumer start...");
}
@SuppressWarnings("unchecked")
@ServiceActivator
public void process(Message<?> message){
System.out.println("Message =======>" + message);
System.out.println("Content =======>" + message.getPayload());
}
}
我不知道发生异常的原因。
我应该怎么做才能让它工作?
PS:我正在尝试通过xml而不是@Bean之类的JavaCode来配置它。
原谅我糟糕的英语TAT
…………
非常感谢..