2

我想知道 spring-xd 如何处理流中的处理器。我真正想知道的是处理器是否阻塞代码,或者它们与反应器(https://github.com/reactor/reactor/wiki/Processor)如何处理处理器有关。

如果我需要执行昂贵的阻塞操作(也就是调用外部系统),最好的方法是什么?我很想为此使用反应器或任何其他反应式框架,但是如何在 XD 管道架构中这样做呢?

问候

4

1 回答 1

2

Spring XD 流中的术语处理器具有特定含义 - 它基本上是从名为input的通道到名为output的通道的 Spring Integration 消息流。按照惯例,这些通道是在 XD 流中生成和使用有效负载的。例如,如果流mystream定义为someSource | someProcessor | someSink,则处理器模块可能会异步执行昂贵的操作,但流仍必须等待处理器输出通道上的消息,因此您不会看到吞吐量有所提高。

但是,在某些情况下,实现接收器以异步运行会有所帮助。在这种情况下,当消息到达接收器的输入通道时,流不会阻塞。异步接收器(有一个环)可以连接到流上的水龙头:

mystream = someSource | ... | someSink
mytap =  tap:stream:mystream > asyncSink  

或命名队列(或主题):

 mystream = someSource | ... | > queue:myQueue
 queue:myQueue > asyncSink

或者它可能是主流的接收器。

要实现异步接收器,需要配置一个 Spring Integration 端点,例如,一个调用外部服务的 ServiceActivator,在接收器模块中带有一个轮询器和一个任务执行器。端点轮询一个可轮询通道(例如,输入通道本身可能被声明为队列通道)。有关详细信息,请参阅http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html

于 2014-04-25T19:47:49.553 回答