8

我正在使用 Reactor 2 和 Spring 4。这是我拥有的典型代码 -Consumer使用存储库

@Consumer
public class ApplicationService {

   @Selector(value="/applications/id", type = SelectorType.URI)
   @ReplyTo
   public Application byApplicationId(String id) throws ApplicationNotFoundException {
      Application app = appRepo.findOne(id);
      if(app == null) 
        throw new ApplicationNotFoundException("Application `" + id + "` could not be found.");
      return app;
   }
}

然后我有一个控制器将请求传递给eventBus我传递请求并返回一个Promise

@RestController
@RequestMapping("/applications")
public class ApplicationsController {
   @RequestMapping(value = "/{id}", method = GET, produces = APPLICATION_JSON_VALUE)
   public Promise<Event<Application>> byApplicationId(@PathVariable final String id) {
      final Promise<Event<Application>> p = Promises.prepare(env);
      eventBus.sendAndReceive("/applications/id", Event.wrap(id), p);
      return p;
   }

}

一切正常,但在ApplicationService抛出异常Promise的情况下,未设置值,但我确实在控制台中得到以下信息:

16:46:58.003 [main] ERROR reactor.bus.EventBus - null
java.lang.reflect.UndeclaredThrowableException
    at org.springframework.util.ReflectionUtils.rethrowRuntimeException(ReflectionUtils.java:302)
...
Caused by: com.metlife.harmony.exceptions.ApplicationNotFoundException: Application `2860c555-0bc4-45e6-95ea-f724ae3f4464` could not be found.
    at com.metlife.harmony.services.ApplicationService.byApplicationId(ApplicationService.java:46) ~[classes/:?]
...
Caused by: reactor.core.support.Exceptions$ValueCause: Exception while signaling value: reactor.bus.Event.class : Event{id=null, headers={}, replyTo=reactor.bus.selector.Selectors$AnonymousKey@4, key=/applications/id, data=2860c555-0bc4-45e6-95ea-f724ae3f4464}

问题是:

  1. 我是否eventBus以错误的方式使用 Reactor?如果是这样,正确的方法是什么

  2. 也许这个功能还没有实现

4

2 回答 2

4

我想我重新评估了在我的 Spring 应用程序中使用 Reactor 的策略。

现在我的控制器看起来像

@RestController
public class GreetingController {

    @Autowired
    private GreetingService greetingService;

    @RequestMapping("/greeting")
    public Promise<ResponseEntity<?>> greeting(final @RequestParam(value = "name", defaultValue = "World") String name) {
        return greetingService.provideGreetingFor(name).map(new Function<Greeting, ResponseEntity<?>>() {
            @Override
            public ResponseEntity<?> apply(Greeting t) {
                return new ResponseEntity<>(t, HttpStatus.OK);
            }
        }).onErrorReturn(WrongNameException.class, new Function<WrongNameException, ResponseEntity<?>>() {
            @Override
            public ResponseEntity<?> apply(WrongNameException t) {
                return new ResponseEntity<>(t.getMessage(), HttpStatus.BAD_REQUEST);
            }
        }).next();
    }
}

服务看起来像

@Service
public class GreetingService {
    @Autowired
    private Environment env;

    private static final String template = "Hello, %s!";
    private final AtomicLong counter = new AtomicLong();

    public Stream<Greeting> provideGreetingFor(String name) {
        return Streams.just(name).dispatchOn(env).map(new Function<String, Greeting>() {
            @Override
            public Greeting apply(String t) {
                if (t == null || t.matches(".*\\d+.*"))
                    throw new WrongNameException();
                return new Greeting(counter.incrementAndGet(), String.format(template, t));
            }
        });
    }
}

不好的是,现在我必须使用Stream<T>服务中的方法(这应该是一种业务逻辑),所以使用该服务的任何人现在都知道该服务的Stream-ish 本质,因此会Stream渗入其他部分代码,例如现在我可能不得不在await()使用服务的代码中使用。

完整的应用程序可在https://github.com/evgeniysharapov/spring-reactor-demo获得

于 2015-05-07T21:09:52.100 回答
1

反应式参考中处理异常和错误的专门章节在这里阅读:

https://projectreactor.io/docs/core/release/reference/#error.handling

无论如何,反应式管道在其中性含义上是“连续的”。您几乎无法阻止它被您的方法的使用者注意到。

于 2020-01-20T08:35:56.403 回答