示例项目位于此处:https ://github.com/codependent/spring5-playground
我想将从 JMS 队列接收到的消息桥接到响应式控制器中,该控制器会将消息作为事件流发布。
我不希望重播消息,也就是说,如果消息到达并且没有任何订阅者,我不希望在任何订阅者稍后发送它们,所以我使用的是 EmitterProcessor:
@Component
public class AlertEmitterProcessor {
private Logger logger = LoggerFactory.getLogger(getClass());
private EmitterProcessor<Alert> processor;
public AlertEmitterProcessor(){
processor = EmitterProcessor.<Alert>create();
processor.connect();
}
public EmitterProcessor<Alert> getProcessor() {
return processor;
}
public void onNext(Alert alert){
logger.info("onNext [{}]", alert);
processor.onNext(alert);
}
public void onComplete(){
logger.info("onComplete");
processor.onComplete();
}
public void onError(Throwable t){
logger.error("onError", t);
processor.onError(t);
}
}
这是我的消息监听器:
@Component
public class AlertMessageListener implements MessageListener{
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private AlertEmitterProcessor alertProcessor;
@Autowired
private MappingJackson2HttpMessageConverter jacksonMessageConverter;
@Override
public void onMessage(Message message) {
logger.info("Message received: [{}]", message);
TextMessage tm = (TextMessage)message;
try {
Alert alert = jacksonMessageConverter.getObjectMapper().readValue(tm.getText(), Alert.class);
alertProcessor.onNext(alert);
} catch (IOException | JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
最后是我的休息控制器:
@Autowired
private AlertEmitterProcessor alertTopicProcessor;
@Autowired
private AlertMessageListener messageListener;
@Autowired
private MappingJackson2HttpMessageConverter jacksonMessageConverter;
@GetMapping(value="/accounts/{id}/alerts/live2", produces="text/event-stream")
public Flux<Alert> getAccountAlertsStreaming2(@PathVariable Integer id) {
return alertTopicProcessor.getProcessor()
.log().filter( a -> a.getAccountId().equals(id) );
}
为了测试它的行为,我添加了这个控制器方法来模拟插入队列:
@GetMapping(value="/mock/accounts/{id}/alerts/put", produces="text/event-stream")
public void putAlert(@PathVariable Integer id) throws JsonProcessingException {
Alert alert = new Alert(id, (long)Math.round(Math.random()*10), "Message");
String alertStr = jacksonMessageConverter.getObjectMapper().writeValueAsString(alert);
TextMessage tm = new MockTextMessage(alertStr);
messageListener.onMessage(tm);
}
启动应用程序后立即加载http://localhost:8080/accounts/1/alerts/live2
,浏览器等待数据。
2016-10-03 13:43:38.755 DEBUG 12800 --- [nio-8080-exec-1] o.s.web.reactive.DispatcherHandler : Processing GET request for [http://localhost:8080/accounts/1/alerts/live2]
2016-10-03 13:43:38.770 DEBUG 12800 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /accounts/1/alerts/live2
2016-10-03 13:43:38.778 DEBUG 12800 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public reactor.core.publisher.Flux<com.codependent.spring5.playground.reactive.dto.Alert> com.codependent.spring5.playground.reactive.web.AccountsRestController.getAccountAlertsStreaming2(java.lang.Integer)]
2016-10-03 13:43:38.779 DEBUG 12800 --- [nio-8080-exec-1] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'accountsRestController'
2016-10-03 13:43:38.800 INFO 12800 --- [nio-8080-exec-1] reactor.unresolved : onSubscribe(reactor.core.publisher.FluxPeek$PeekSubscriber@54d4fb36)
2016-10-03 13:43:38.802 INFO 12800 --- [nio-8080-exec-1] reactor.unresolved : request(unbounded)
2016-10-03 13:43:38.803 INFO 12800 --- [nio-8080-exec-1] reactor.unresolved : onNext(1)
2016-10-03 13:43:38.822 INFO 12800 --- [nio-8080-exec-1] reactor.Flux.EmitterProcessor.2 : onSubscribe(reactor.core.publisher.EmitterProcessor$EmitterSubscriber@227405f2)
2016-10-03 13:43:38.822 INFO 12800 --- [nio-8080-exec-1] reactor.Flux.EmitterProcessor.2 : request(1)
2016-10-03 13:43:38.823 INFO 12800 --- [nio-8080-exec-1] reactor.unresolved : onComplete()
然后我发布一些消息http://localhost:8080/mock/accounts/1/alerts/put
。
2016-10-03 13:43:43.063 DEBUG 12800 --- [nio-8080-exec-2] o.s.web.reactive.DispatcherHandler : Processing GET request for [http://localhost:8080/mock/accounts/1/alerts/put]
2016-10-03 13:43:43.063 DEBUG 12800 --- [nio-8080-exec-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /mock/accounts/1/alerts/put
2016-10-03 13:43:43.068 DEBUG 12800 --- [nio-8080-exec-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public void com.codependent.spring5.playground.reactive.web.AccountsRestController.putAlert(java.lang.Integer) throws com.fasterxml.jackson.core.JsonProcessingException]
2016-10-03 13:43:43.069 DEBUG 12800 --- [nio-8080-exec-2] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'accountsRestController'
2016-10-03 13:43:43.071 INFO 12800 --- [nio-8080-exec-2] reactor.unresolved : onSubscribe(reactor.core.publisher.FluxPeek$PeekSubscriber@2ba7d09c)
2016-10-03 13:43:43.071 INFO 12800 --- [nio-8080-exec-2] reactor.unresolved : request(unbounded)
2016-10-03 13:43:43.072 INFO 12800 --- [nio-8080-exec-2] reactor.unresolved : onNext(1)
2016-10-03 13:43:43.112 INFO 12800 --- [nio-8080-exec-2] c.c.s.p.r.message.AlertMessageListener : Message received: [com.codependent.spring5.playground.reactive.message.MockTextMessage@37262c9e]
2016-10-03 13:43:43.145 INFO 12800 --- [nio-8080-exec-2] c.c.s.p.r.message.AlertEmitterProcessor : onNext [Alert [alertId=3, message=Message, accountId=1]]
2016-10-03 13:43:43.146 INFO 12800 --- [nio-8080-exec-2] reactor.Flux.EmitterProcessor.2 : onNext(Alert [alertId=3, message=Message, accountId=1])
2016-10-03 13:43:43.177 INFO 12800 --- [nio-8080-exec-2] reactor.unresolved : onComplete()
2016-10-03 13:43:43.177 DEBUG 12800 --- [nio-8080-exec-2] o.s.h.s.r.ServletHttpHandlerAdapter : Successfully completed request
但是没有人进入浏览器。这最终以 500 错误(无日志)结束。
经过一些手动重试后,它开始接收数据......
2016-10-03 13:45:07.726 DEBUG 12800 --- [nio-8080-exec-8] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public reactor.core.publisher.Flux<com.codependent.spring5.playground.reactive.dto.Alert> com.codependent.spring5.playground.reactive.web.AccountsRestController.getAccountAlertsStreaming2(java.lang.Integer)]
2016-10-03 13:45:07.726 DEBUG 12800 --- [nio-8080-exec-8] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'accountsRestController'
2016-10-03 13:45:07.727 INFO 12800 --- [nio-8080-exec-8] reactor.unresolved : onSubscribe(reactor.core.publisher.FluxPeek$PeekSubscriber@909f06f)
2016-10-03 13:45:07.727 INFO 12800 --- [nio-8080-exec-8] reactor.unresolved : request(unbounded)
2016-10-03 13:45:07.727 INFO 12800 --- [nio-8080-exec-8] reactor.unresolved : onNext(1)
2016-10-03 13:45:07.729 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9 : onSubscribe(reactor.core.publisher.EmitterProcessor$EmitterSubscriber@7ce1f3e)
2016-10-03 13:45:07.729 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9 : request(1)
2016-10-03 13:45:07.729 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9 : onNext(Alert [alertId=4, message=Message, accountId=1])
2016-10-03 13:45:07.730 INFO 12800 --- [nio-8080-exec-8] reactor.unresolved : onComplete()
2016-10-03 13:45:07.747 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9 : request(1)
2016-10-03 13:45:07.747 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9 : onNext(Alert [alertId=0, message=Message, accountId=1])
2016-10-03 13:45:07.748 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9 : request(1)
...但很多时候它没有得到任何东西。