我正在尝试在 Spring Boot 应用程序中创建一个 Spring Cloud Stream Source Bean,它只是将方法的结果发送到流(底层 Kafka 主题绑定到流)。
我见过的大多数 Stream 示例都使用@InboundChannelAdapter
注释通过轮询器将数据发送到流。但我不想使用轮询器。我尝试将轮询器设置为一个空数组,但另一个问题是使用 @InboundChannelAdapter 时您无法拥有任何方法参数。
我正在尝试做的总体概念是从入站流中读取的。进行一些异步处理,然后将结果发布到出站流。因此,使用处理器似乎也不是一种选择。我正在使用@StreamListener
接收器通道来读取入站流并且有效。
这是我一直在尝试的一些代码,但这根本不起作用。我希望它会这么简单,因为我的 Sink 是但也许不是。寻找某人向我指出不是处理器的源示例(即不需要收听入站通道)并且不使用@InboundChannelAdapter
或给我一些设计技巧来完成我需要做的事情一种不同的方式。谢谢!
@EnableBinding(Source.class)
public class JobForwarder {
@ServiceActivator(outputChannel = Source.OUTPUT)
@SendTo(Source.OUTPUT)
public String forwardJob(String message) {
log.info(String.format("Forwarding a job message [%s] to queue [%s]", message, Source.OUTPUT));
return message;
}
}