1

如何创建一个故障安全的 Spring XD 流,它会在一条特定消息触发异常后继续正常运行(即记录错误但继续使用流中的下一条消息),而无需在每个消息中添加 try catch(Throwable)流步骤?

使用 Reactor 或 RxJava 模型有什么简单的方法吗?

使用 Reactor 的示例流:

@Override
public Publisher<Tuple> process(Stream<GenericMessage> inputStream) {
  return inputStream
      .flatMap(SomeClass::someFlatMap)
      .filter(SomeClass::someFilter)
      .when(Throwable.class, t -> log.error("error", t));
}
4

1 回答 1

0

RxJava 可以被处理器模块使用。在创建订阅时,需要创建订阅并处理错误,订阅者需要添加一个 onError 处理程序:

       subject = new SerializedSubject(PublishSubject.create());
        Observable<?> outputStream = processor.process(subject);
        subscription = outputStream.subscribe(new Action1<Object>() {
            @Override
            public void call(Object outputObject) {
                if (ClassUtils.isAssignable(Message.class, outputObject.getClass())) {
                    getOutputChannel().send((Message) outputObject);
                } else {
                    getOutputChannel().send(MessageBuilder.withPayload(outputObject).build());
                }
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                logger.error(throwable.getMessage(), throwable);
            }
        }, new Action0() {
            @Override
            public void call() {
                logger.error("Subscription close for [" + subscription + "]");
            }
        });

在这里查看更多示例:https ://github.com/spring-projects/spring-xd/tree/master/spring-xd-rxjava/src

于 2015-12-23T19:42:55.667 回答