4

我正在尝试在 Spring Boot 应用程序中创建一个 Spring Cloud Stream Source Bean,它只是将方法的结果发送到流(底层 Kafka 主题绑定到流)。

我见过的大多数 Stream 示例都使用@InboundChannelAdapter注释通过轮询器将数据发送到流。但我不想使用轮询器。我尝试将轮询器设置为一个空数组,但另一个问题是使用 @InboundChannelAdapter 时您无法拥有任何方法参数。

我正在尝试做的总体概念是从入站流中读取的。进行一些异步处理,然后将结果发布到出站流。因此,使用处理器似乎也不是一种选择。我正在使用@StreamListener接收器通道来读取入站流并且有效。

这是我一直在尝试的一些代码,但这根本不起作用。我希望它会这么简单,因为我的 Sink 是但也许不是。寻找某人向我指出不是处理器的源示例(即不需要收听入站通道)并且不使用@InboundChannelAdapter或给我一些设计技巧来完成我需要做的事情一种不同的方式。谢谢!

@EnableBinding(Source.class)
public class JobForwarder {

   @ServiceActivator(outputChannel = Source.OUTPUT)
   @SendTo(Source.OUTPUT)
   public String forwardJob(String message) {
       log.info(String.format("Forwarding a job message [%s] to queue [%s]", message, Source.OUTPUT));
       return message;
   }
}
4

3 回答 3

4

您的原始要求可以通过以下步骤来实现。

  1. 创建您的自定义绑定界面(您也可以使用默认界面@EnableBinding(Source.class)

    public interface CustomSource {
        String OUTPUT = "customoutput";
    
        @Output(CustomSource.OUTPUT)
        MessageChannel output();
    }
    
  2. 注入你绑定的频道

    @Component
    @EnableBinding(CustomSource.class)
    public class CustomOutputEventSource {
    
        @Autowired
        private CustomSource customSource;
    
        public void sendMessage(String message) {
            customSource.output().send(MessageBuilder.withPayload(message).build());
        }
    }
    
  3. 测试一下

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class CustomOutputEventSourceTest {
    
        @Autowired
        CustomOutputEventSource output;
    
        @Test
        public void sendMessage() {
            output.sendMessage("Test message from JUnit test");
        }
    }
    
于 2016-12-27T08:20:18.943 回答
0

感谢您的输入。我花了一段时间才回到问题上来。我确实尝试阅读@Publisher. 它看起来正是我需要的,但我无法初始化正确的 bean 以使其正确连接。

为了回答您的问题,forwardJob()在对输入进行一些异步处理后调用该方法。

最终我只是spring-kafka直接使用库来实现,这更加明确并且感觉更容易上手。我认为我们将坚持使用 kafka 作为唯一的通道绑定,所以我认为我们会坚持使用该库。

但是,我们最终确实让 spring-cloud-stream 库工作得非常简单。这是没有轮询器的单一来源的代码。

@Component
@EnableBinding(Source.class)
public class JobForwarder {

    private Source source;

    @Autowired
    public ScheduledJobForwarder(Source source) {
        this.source = source;
    }

    public void forwardScheduledJob(String message) {
        log.info(String.format("Forwarding a job message [%s] to queue [%s]", message, Source.OUTPUT));
        source.output().send(MessageBuilder.withPayload(message).build());
    }
}
于 2016-10-06T05:07:52.957 回答
0

那么如果不想使用 Poller,是什么原因导致该forwardJob()方法被调用呢?

您不能只调用该方法并期望结果进入输出通道。

使用您当前的配置,您需要一个inputChannel包含入站消息的服务(以及向该通道发送消息的内容)。它不必绑定到运输工具;它可以是一个简单的MessageChannel @Bean.

或者,您可以使用 a@Publisher发布方法调用的结果(以及返回给调用者)- docs here

@Publisher(channel = Source.OUTPUT)
于 2016-09-08T17:24:42.683 回答