0

我有一个场景,它处理 armeria 请求,并将一些事件发送到guava's EventBus。问题是我在 EventBus 处理程序中处理事件时丢失了上下文。我想知道有没有办法让事件处理器访问ServiceRequestContext

class EventListener {
    @Subscribe
    public void process(SomeCustomizedClass event) {
        final ServiceRequestContext context = ServiceRequestContext.currentOrNull();
        log.info("process ServiceRequestContext context={}", context);
    }
}

注册事件处理程序。

EventBus eventBus = new AsyncEventBus(ThreadPoolTaskExecutor());
eventBus.register(new EventListener());

这是我的Armeria服务

@Slf4j
public class NameAuthRestApi {
    final NameAuthService nameAuthService;

    @Post("/auth")
    @ProducesJson
    public Mono<RealNameAuthResp> auth(RealNameAuthReq req) {
        return nameAuthService.auth(NameAuthConverter.CONVERTER.toDto(req))
                              .handle((result, sink) -> {
                                  if (result.isSuccess()) {
                                      // I post an event here, but the event process couldn't access the ServiceRequestContext
                                      // that's would be the problem.
                                      eventBus.post(new SomeCustomizedClass(result));

                                      final RealNameAuthResp realNameAuthResp = new RealNameAuthResp();
                                      realNameAuthResp.setTradeNo(result.getTradeNo());
                                      realNameAuthResp.setSuccess(true);
                                      sink.next(realNameAuthResp);
                                      sink.complete();
                                  } else {
                                      sink.error(new SystemException(ErrorCode.API_ERROR, result.errors()));
                                  }
                              });
    }
}
4

3 回答 3

2

你需要做:

public Mono<RealNameAuthResp> auth(ServiceRequestContxt ctx, RealNameAuthReq req) {
    // Executed by an EventLoop 1.
    // This thread has the ctx in its thread local.
    return nameAuthService.auth(NameAuthConverter.CONVERTER.toDto(req))
                          .handle((result, sink) -> {
                              // Executed by another EventLoop 2.
                              // But this doens't.
                              try (SafeCloseable ignord = ctx.push()) {
                                  if (result.isSuccess()) {
                                      ...
                                  } else {
                                      ...
                                  }
                              }
                          });
}

问题是句柄方法是由另一个线程执行的,该线程在其本地线程中没有 ctx。所以,你应该手动设置 ctx.

您可以使用xAsync以下方法实现相同的效果ctx.eventLoop()

public Mono<RealNameAuthResp> auth(ServiceRequestContxt ctx, RealNameAuthReq req) {
    return nameAuthService.auth(NameAuthConverter.CONVERTER.toDto(req))
                          .handleAsync((result, sink) -> {
                              if (result.isSuccess()) {
                                  ...
                              } else {
                                  ...
                              }
                          }, ctx.eventLoop());
}
于 2020-09-09T03:16:20.603 回答
1

我们有两种方法来解决这个问题:首先,使用具有 ctx 的执行器:

ctx.eventLoop().submit(new Task(new Event("eone")));
// If it's blocking task, then we must use ctx.blockingTaskExecutor().

或者,手动传播 ctx:


@Slf4j
public static class Task implements Runnable {
    private final Event event;
    private final ServiceRequestContext ctx;

    Task(Event event) {
        this.event = event;
        ctx = ServiceRequestContext.current();
    }

    @Override
    public void run() {
        try (SafeCloseable ignored = ctx.push()) {
            ...
        }
    }
}
于 2020-09-09T06:40:49.840 回答
0

@minwoox,为了简化,我的代码看起来像这样

public class NameAuthRestApi {
    JobExecutor executor = new JobExecutor();

    @Post("/code")
    public HttpResponse authCode(ServiceRequestContext ctx) {
        try (SafeCloseable ignore = ctx.push()) {
            executor.submit(new Task(new Event("eone")));
        }
        return HttpResponse.of("OK");
    }

    @Getter
    @AllArgsConstructor
    public static class Event {
        private String name;
    }

    @RequiredArgsConstructor
    @Slf4j
    public static class Task implements Runnable {
        final Event event;

        @Override
        public void run() {
            // couldn't access ServiceRequestContext here
            ServiceRequestContext ctx = ServiceRequestContext.currentOrNull();
            log.info("ctx={}, event={}", ctx, event);
        }
    }

    public static class JobExecutor {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        public void submit(Task task) {
            executorService.submit(task);
        }
    }
}

于 2020-09-09T06:03:24.797 回答