1

我正在尝试使用 rxjava 创建一个事件总线,在该事件总线上我会抛出一些即使发生错误也会继续的命令。我正在研究 onErrorFlatMap 但它不再存在,我还无法理解具体化和非具体化。

这是我已经拥有的:

我的小测试

public class EventBusTest {

    private EventBus eventBus = new EventBus();

    @Test
    public void test() {
        //listeners in service layer
        eventBus.on(createOf(BeanA.class))
                .subscribe(this::handleBeanAInService1);
        eventBus.on(createOf(BeanA.class))
                .doOnNext(BeanA::validateBusinessLogic)
                .subscribe(this::handleBeanAInService2);
        eventBus.on(createOf(BeanB.class))
                .subscribe(this::handleBeanBInService);

        //incoming requests in rest layer
        List<String> incomingCalls = new ArrayList<>();
        Collections.addAll(incomingCalls, "firstRequest", "secondRequestErrorOnValidate", "thirdRequest", "fourthRequestErrorInService", "fifthRequest");
        incomingCalls.stream().forEach(this::incomingCallsEgHttpCalls);
    }

    public void incomingCallsEgHttpCalls(String string) {
        Observable.just(string)
                .map(aName -> new BeanA(aName))
                .doOnNext(bean -> eventBus.post(new CreateCommand(bean)))
                .subscribe(bean -> System.out.println("\tReturn ok to client: " + bean.getName()), error -> System.out.println("\tReturning error to client: " + string + "; " + error.getMessage()));
    }

    public void handleBeanAInService1(BeanA beanA) {
        System.out.println("BeanAService1 handling BeanA " + beanA.getName());
        if(beanA.getName().contains("ErrorInService")) {
            throw new RuntimeException("service exception for " + beanA.getName());
        }
        eventBus.post(new CreateCommand(new BeanB(beanA.getName())));
    }

    public void handleBeanAInService2(BeanA beanA) {
        System.out.println("BeanAService2 handling BeanA " + beanA.getName());
    }

    public void handleBeanBInService(BeanB beanB) {
        System.out.println("BeanBService handling BeanB " + beanB.getName());
    }

}

事件总线

@Named
public class EventBus {

    private PublishSubject<Object> publishSubject = PublishSubject.create();

    public EventBus post(Object object) {
        //publishSubject.onNext(object); => does not work, OnErrorNotImplementedExceptions if eventbus observers throw errors in validate

        //To prevent OnErrorNotImplementedException
        try {
            publishSubject.onNext(object);
        } catch (OnErrorNotImplementedException e) {
            Throwable cause = e.getCause();
            if (cause instanceof RuntimeException) {
                throw (RuntimeException) cause;
            }
            throw new RuntimeException(cause);
        }
        return this;
    }

    public Observable<Object> observe() {
        //I also tried
        //return publishSubject.onErrorResumeNext(publishSubject);
        return publishSubject;
    }

    public <F, T> Observable<T> on(Observable.Transformer<F, T> onCommand) {
        return onCommand.call((Observable<F>) publishSubject);
    }

}

创建命令

public class CreateCommand {

    private Object object;

    public CreateCommand(Object object) {
        this.object = object;
    }

    public Class<?> type() {
        return object.getClass();
    }

    public <T> T value() {
        return (T) object;
    }

    public static <F, T> Observable.Transformer<F, T> createOf(Class<T> clazz) {
        return observable -> observable
                .ofType(CreateCommand.class)
                .filter(createCommand -> clazz.isInstance(createCommand.object))
                .map(createCommand -> clazz.cast(createCommand.object));
    }
}

豆A

public class BeanA {
    private String name;

    public BeanA(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public static void validateBusinessLogic(BeanA beanA) {
        if (beanA.getName().contains("ErrorOnValidate")) {
            throw new RuntimeException("validate exception for " + beanA.getName());
        }
    }
}

豆B

public class BeanB {

    private String name;

    public BeanB(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

}

实际输出

BeanAService1 handling BeanA firstRequest
BeanBService handling BeanB firstRequest
BeanAService2 handling BeanA firstRequest
    Return ok to client: firstRequest
BeanAService1 handling BeanA secondRequestErrorOnValidate
BeanBService handling BeanB secondRequestErrorOnValidate
    Returning error to client: secondRequestErrorOnValidate; validate exception for secondRequestErrorOnValidate
BeanAService1 handling BeanA thirdRequest
BeanBService handling BeanB thirdRequest
    Return ok to client: thirdRequest
BeanAService1 handling BeanA fourthRequestErrorInService
    Returning error to client: fourthRequestErrorInService; service exception for fourthRequestErrorInService
    Return ok to client: fifthRequest

想要的输出

BeanAService1 handling BeanA firstRequest
BeanBService handling BeanB firstRequest
BeanAService2 handling BeanA firstRequest
    Return ok to client: firstRequest
BeanAService1 handling BeanA secondRequestErrorOnValidate
BeanBService handling BeanB secondRequestErrorOnValidate
    Returning error to client: secondRequestErrorOnValidate; validate exception for secondRequestErrorOnValidate
BeanAService1 handling BeanA thirdRequest
BeanBService handling BeanB thirdRequest
BeanAService2 handling BeanA thirdRequest
    Return ok to client: thirdRequest
BeanAService1 handling BeanA fourthRequestErrorInService
BeanAService2 handling BeanA fourthRequestErrorInService
    Returning error to client: fourthRequestErrorInService; service exception for fourthRequestErrorInService
BeanAService1 handling BeanA fifthRequest
BeanBService handling BeanB fifthRequest
BeanAService2 handling BeanA fifthRequest
    Return ok to client: fifthRequest

任何想法如何解决它?我看到 rxjava 中有一个关于 EventBus 实现示例的问题,但没有找到。

我还希望我的服务不需要做一些特定的 rxjava 错误处理。

或者是否有另一种方式将事件推送到多个其他可观察对象,然后再次压缩结果?

4

1 回答 1

1

RxJava 中的错误是终止可观察流的终端事件(除非由 onErrorXXX 和 onException 运算符捕获/恢复)。如果您想保持流处于活动状态,则需要将错误包装到其他值类型(即通知)中并在相关位置解包。

这是一个示例要点,它显示了 RxJava 结构上的事件总线实现。请注意以下事项:

  • 如果可以从多个线程调用事件总线的 post() 方法,您可能需要序列化对主题的访问。
  • RxJava 有时不会反弹从 onNext 抛出的异常,而只是在流的最开始处,终止整个流,因此您需要一个将此类错误限制在某个级别以下的运算符,因此示例中的 ClientErrorBounceBack 运算符。
于 2015-04-22T09:07:39.670 回答