我正在测试 spring-cloud-starter-stream-kafka。下面出现错误。
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:43)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171)
at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4$1.doWithRetry(KafkaMessageChannelBinder.java:607)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:154)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.onMessage(KafkaMessageChannelBinder.java:604)
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221)
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209)
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67)
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
... 32 common frames omitted
我的 StreamApplication.java
package de.codecentric;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.support.MessageBuilder;
@SpringBootApplication
@EnableBinding({PersonProcessor.class, LogProcessor.class})
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class, args);
}
@StreamListener(LogProcessor.CHANNEL)
public void logEvent(EventLog el) {
System.out.println("Received event log: " + el.id);
}
@StreamListener(PersonProcessor.CHANNEL)
public void logPerson(Person p) {
System.out.println("Received person: " + p.name);
}
@Bean
@InboundChannelAdapter(value = PersonProcessor.CHANNEL, poller = @Poller(fixedDelay = "3000", maxMessagesPerPoll = "1"))
public MessageSource<Person> timerMessageSource() {
return () -> MessageBuilder.withPayload(new Person()).build();
}
@Bean
@InboundChannelAdapter(value = LogProcessor.CHANNEL, poller = @Poller(fixedDelay = "3000", maxMessagesPerPoll = "1"))
public MessageSource<EventLog> logMessageSource() {
return () -> MessageBuilder.withPayload(new EventLog()).build();
}
public static class EventLog {
private static int seq = 0;
public String id = seq++ + "";
}
public static class Person {
private static int seq = 0;
public String name = "hi " + seq++;
}
}
日志处理器.java
package de.codecentric;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface LogProcessor {
String CHANNEL = "logs";
@Output(LogProcessor.CHANNEL)
MessageChannel output();
@Input(LogProcessor.CHANNEL)
SubscribableChannel input();
}
PersonProcessor.java
package de.codecentric;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface PersonProcessor {
String CHANNEL = "person";
@Output(PersonProcessor.CHANNEL)
MessageChannel output();
@Input(PersonProcessor.CHANNEL)
SubscribableChannel input();
}
我还可以看到输出:
接收人:hi 0 接收事件日志:0 接收事件日志:4 接收人:hi 4 接收事件日志:9 接收人:hi 9
谢谢。