2

我正在测试 spring-cloud-starter-stream-kafka。下面出现错误。

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:43)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171)
at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4$1.doWithRetry(KafkaMessageChannelBinder.java:607)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:154)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.onMessage(KafkaMessageChannelBinder.java:604)
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221)
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209)
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67)
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
... 32 common frames omitted

我的 StreamApplication.java

package de.codecentric;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.support.MessageBuilder;

@SpringBootApplication
@EnableBinding({PersonProcessor.class, LogProcessor.class})
public class StreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamApplication.class, args);
    }

    @StreamListener(LogProcessor.CHANNEL)
    public void logEvent(EventLog el) {
        System.out.println("Received event log: " + el.id);
    }

    @StreamListener(PersonProcessor.CHANNEL)
    public void logPerson(Person p) {
        System.out.println("Received person: " + p.name);
    }

    @Bean
    @InboundChannelAdapter(value = PersonProcessor.CHANNEL, poller = @Poller(fixedDelay = "3000", maxMessagesPerPoll = "1"))
    public MessageSource<Person> timerMessageSource() {
        return () -> MessageBuilder.withPayload(new Person()).build();
    }

    @Bean
    @InboundChannelAdapter(value = LogProcessor.CHANNEL, poller = @Poller(fixedDelay = "3000", maxMessagesPerPoll = "1"))
    public MessageSource<EventLog> logMessageSource() {
        return () -> MessageBuilder.withPayload(new EventLog()).build();
    }

    public static class EventLog {
        private static int seq = 0;
        public String id = seq++ + "";
    }

    public static class Person {
        private static int seq = 0;
        public String name = "hi " + seq++;
    }
}

日志处理器.java

package de.codecentric;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;


public interface LogProcessor {
    String CHANNEL = "logs";

    @Output(LogProcessor.CHANNEL)
    MessageChannel output();

    @Input(LogProcessor.CHANNEL)
    SubscribableChannel input();
}

PersonProcessor.java

package de.codecentric;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;


public interface PersonProcessor {
    String CHANNEL = "person";

    @Output(PersonProcessor.CHANNEL)
    MessageChannel output();

    @Input(PersonProcessor.CHANNEL)
    SubscribableChannel input();
}

我还可以看到输出:

接收人:hi 0 接收事件日志:0 接收事件日志:4 接收人:hi 4 接收事件日志:9 接收人:hi 9

谢谢。

4

2 回答 2

5

我不确定这是否是问题所在,但您的输入和输出通道需要不同的目的地名称 - 例如

CHANNELIN = personIn, CHANNELOUT = personOut.

处理器不打算向自己发送消息;它旨在接收消息、处理它并将结果发送到不同的目的地。

处理器本身不会生成消息——这就是源的目的。

于 2016-09-18T14:55:36.600 回答
0

我遇到了同样的问题,我通过升级我的 spring cloud 版本(从 Camden SR7 到 Dalston SR4)解决了这个问题。

于 2017-10-16T12:35:42.897 回答