-1

给出一个上下文,我的类中有两个方法都返回 Uni,其中第一个方法以下列方式依赖于第二个方法

public Uni<String> greeting(String name) {
    log.info("\n\n");
    log.info("\t`greeting(String name)` Executing on Thread {}",Thread.currentThread().getName());

    return Uni
            .createFrom()
            .item(name) //synchronous now imagine you have retrieve a value from an I/O call you will have to pass a supplier, ref README.md#Links.1
            .emitOn(emitExecutor)
            .onItem()
            .transform(parameter-> {
                log.debug("`(p)>Transform` invoked on Thread {}",Thread.currentThread().getName());
                assert Thread.currentThread().getName().equals(threadName);
                try {
                    return ioSimulation(parameter,Thread.currentThread().getName()).subscribeAsCompletionStage().get();
                } catch (InterruptedException | ExecutionException e) {
                    log.error("failed to execute ioSimulation due to {}",e.getMessage());
                    throw new RuntimeException("failed to communicate with client {}"+e.getMessage());
                }
            }).onFailure()
            .retry() 
    .atMost(2);
}


public Uni<String> ioSimulation(String param,String threadName){
        log.debug("`ioSimulation(String param)` Executing on Thread {}",Thread.currentThread().getName());
        assert Thread.currentThread().getName().equals(threadName);
        return MockServer.client
                .getAbs("http://localhost:80")
                .addQueryParam("name",param)
                .send()
                .onItem().transform(response-> {
                    if (response.statusCode() == 200){
                        return response.bodyAsString();
                    }else{
                        throw  new IllegalStateException(response.bodyAsString());
                    }
        });

现在在greeting(String name)(第一个)方法中返回我必须使用的 String 值,subscribeAsCompletionStage().get() 否则返回类型将是Uni<Uni<String>>.

我的问题如下

  1. 是一种我可以返回第二种方法的实际结果(包含在 Uni 中的确切值)ioSimulation而不使用subscribeAsCompletionStage().get()if yes 那么我该怎么做的方法?
  2. 这样做有没有更好的办法?比如重构现有的方法。

如果有人有兴趣,可以在这里找到完整的代码

4

1 回答 1

0

感谢@Ladicek,有一种开箱即用的方式来处理块中的异步Uni<T>返回transform,解决方案是

public Uni<String> greeting(String name) {
    log.info("\n\n");
    log.info("\t`greeting(String name)` Executing on Thread {}",Thread.currentThread().getName());

    return Uni
            .createFrom()
            .item(name) //synchronous now imagine you have retrieve a value from an I/O call you will have to pass a supplier, ref README.md#Links.1
            .emitOn(emitExecutor)
            .onItem()
            .transformToUni(parameter -> ioSimulation(parameter,Thread.currentThread().getName()))
            .onFailure()
            .retry()
    .atMost(2);

}


public Uni<String> ioSimulation(String param,String threadName){
    log.debug("`ioSimulation(String param)` Executing on Thread {}",Thread.currentThread().getName());
    assert Thread.currentThread().getName().equals(threadName);
    return MockServer.client
            .getAbs("http://localhost:80")
            .addQueryParam("name",param)
            .send()
            .onItem().transform(response-> {
                if (response.statusCode() == 200){
                    return response.bodyAsString();
                }else{
                    throw  new IllegalStateException(response.bodyAsString());
                }
    });

}

transformToUni异步转换接收到的项目并返回一个新的Uni<T>

更新:

另一种方法(我一直在寻找的方法)是使用回调。

public Uni<String> callbackGreeting(String name) {
    log.info("\n\n");
    log.info("\t`callbackGreeting(String name)` Executing on Thread {}", Thread.currentThread().getName());

    return Uni
            .createFrom()
            .item(name) //synchronous now imagine you have retrieve a value from an I/O call you will have to pass a supplier, ref README.md#Links.1
            .emitOn(emitExecutor)
            .onItem()
            .transformToUni(param -> {
                return Uni.createFrom().emitter(em -> {
                    ioSimulation(param, Thread.currentThread().getName())
                            .subscribe()
                            .with(success -> em.complete(success.bodyAsString()), //putting result of uni ioSimulation into the callback
                                    exception -> {
                                        log.info("the following error occurred before sending to em.complete {}", exception.getMessage());
                                        em.fail(exception);
                                    });

                });
            })
            .onFailure()
            .retry()
            .atMost(2)
            .map(item -> "Operation completed with item " + item);

}

此示例仅用于演示目的,我知道这不是最佳解决方案。完整的代码在这里

于 2021-05-14T20:03:35.860 回答