9

我有一个spring-cloud-stream带有 kafka 绑定的应用程序。我想从同一个可执行文件(jar)中发送和接收来自同一个主题的消息。我的频道定义如下:- public interface ChannelDefinition { @Input("forum") public SubscriableChannel readMessage(); @Output("forum") public MessageChannel postMessage(); }

@StreamListener用来接收消息。我收到各种意外错误。有时,我收到

  1. 没有为所有其他消息找到 unknown.message.channel 的调度程序
  2. 如果我将命令行 kafka 订阅者附加到上述论坛主题,它会收到所有其他消息。
  3. 我的应用程序接收所有其他消息,这是来自命令行订阅者的专有消息集。我已确保我的应用程序以特定的组名订阅。

是否有上述用例的工作示例?

4

3 回答 3

14

这是定义可绑定通道的错误方法(因为forum两者都使用了名称)。我们应该更加彻底并且快速失败,但是您将输入和输出绑定到同一个通道并在您的应用程序中创建一个竞争消费者。这也解释了您与备用消息有关的其他问题。

你应该做的是:

public interface ChannelDefinition { 

   @Input
   public MessageChannel readMessage();

   @Output
   public MessageChannel postMessage();
}

然后使用应用程序属性将您的频道绑定到同一个队列:

spring.cloud.stream.bindings.readMessage.destination=forum
spring.cloud.stream.bindings.postMessage.destination=forum
于 2016-07-21T13:43:20.063 回答
1

除了 Marius Bogoevici 的上述答案外,这里还有一个如何收听该输入的示例。

@StreamListener
public void handleNewOrder(@Input("input") SubscribableChannel input) {
    logger.info("Subscribing...");
    input.subscribe((message) -> {
        logger.info("Received new message: {}", message);
    });
}
于 2018-08-17T21:32:16.473 回答
0

对我来说,从“输入”中消费是行不通的。我需要使用方法名称@Streamlistener并且需要使用,@EnableBinding如下所示:

@Slf4j
@RequiredArgsConstructor
@EnableBinding(value = Channels.class)
public class Consumer {
    
    @StreamListener("readMessage")
    public void retrieve(Something req) {
        log.info("Received {{}}", req);
    }

}
于 2021-10-11T07:12:37.973 回答