2

查看 RxJava 为我们的 API 构建异步支持。现在我们使用 jetty + JAX-RS @Path 注释并且不确定将传入的 REST api 调用绑定到 RxJava API 的正确方法是什么。

基本上这是在释放请求线程的上下文中,直到数据库的响应准备好。

查看 Vert.x,但这需要 java 7,我们现在与 java 6 绑定。

寻找有关上述建议。将传入的 http 请求绑定到 RxJava API 的典型方法是什么?

4

2 回答 2

4

这是一个为JAX-RS创建 Customer Observable 的示例:

public class ApiService {
    Client client;

    public ApiService() {
        client = ClientBuilder.newClient();
    }

    public Observable<Customer> createCustomerObservable(final int customerId) {
        return Observable.create(new Observable.OnSubscribe<Customer>() {
            @Override
            public void call(final Subscriber<? super Customer> subscriber) {
                client
                        .target("http://domain.com/customers/{id}")
                        .resolveTemplate("id", customerId)
                        .request()
                        .async()
                        .get(new InvocationCallback<Customer>() {
                            @Override
                            public void completed(Customer customer) {
                                // Do something
                                if (!subscriber.isUnsubscribed()) {
                                    subscriber.onNext(customer);
                                    subscriber.onCompleted();
                                }
                            }

                            @Override
                            public void failed(Throwable throwable) {
                                // Process error
                                if (!subscriber.isUnsubscribed()) {
                                    subscriber.onError(throwable);
                                }
                            }
                        });
            }
        });
    }
}
于 2014-08-20T17:22:15.467 回答
1

Something like the following should work for Jetty:

public class ApiService {
    HttpClient httpClient;

    public ApiService(HttpClient httpClient,) {
        this.httpClient = httpClient;
    }

    public <RequestType, ResultType> Observable<ResultType> createApiObservable(final RequestType requestContent) {
        return Observable.create(new Observable.OnSubscribe<ResultType>() {
            @Override
            public void call(final Subscriber<? super ResultType> subscriber) {
                // Create the request content for your API. Your logic here...
                ContentProvider contentProvider = serializeRequest(requestContent);

                httpClient
                        .newRequest("http://domain.com/path")
                        .content(contentProvider)
                        .send(new Response.CompleteListener() {
                            @Override
                            void onComplete(Result result) {
                                // Pass along the error if one occurred.
                                if (result.isFailed()) {
                                    subscriber.onError(result.getFailure());
                                    return;
                                }

                                // Convert the response data to the ResultType. Your logic here...
                                ResultType resultContent = parseResponse(result.getResponse());

                                // Send the result to the subscriber.
                                subscriber.onNext(responseBytes);
                                subscriber.onCompleted();
                            }
                        });
            }
        });
    }
}
于 2014-08-20T17:09:39.033 回答