查看 RxJava 为我们的 API 构建异步支持。现在我们使用 jetty + JAX-RS @Path 注释并且不确定将传入的 REST api 调用绑定到 RxJava API 的正确方法是什么。
基本上这是在释放请求线程的上下文中,直到数据库的响应准备好。
查看 Vert.x,但这需要 java 7,我们现在与 java 6 绑定。
寻找有关上述建议。将传入的 http 请求绑定到 RxJava API 的典型方法是什么?
查看 RxJava 为我们的 API 构建异步支持。现在我们使用 jetty + JAX-RS @Path 注释并且不确定将传入的 REST api 调用绑定到 RxJava API 的正确方法是什么。
基本上这是在释放请求线程的上下文中,直到数据库的响应准备好。
查看 Vert.x,但这需要 java 7,我们现在与 java 6 绑定。
寻找有关上述建议。将传入的 http 请求绑定到 RxJava API 的典型方法是什么?
这是一个为JAX-RS创建 Customer Observable 的示例:
public class ApiService {
Client client;
public ApiService() {
client = ClientBuilder.newClient();
}
public Observable<Customer> createCustomerObservable(final int customerId) {
return Observable.create(new Observable.OnSubscribe<Customer>() {
@Override
public void call(final Subscriber<? super Customer> subscriber) {
client
.target("http://domain.com/customers/{id}")
.resolveTemplate("id", customerId)
.request()
.async()
.get(new InvocationCallback<Customer>() {
@Override
public void completed(Customer customer) {
// Do something
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(customer);
subscriber.onCompleted();
}
}
@Override
public void failed(Throwable throwable) {
// Process error
if (!subscriber.isUnsubscribed()) {
subscriber.onError(throwable);
}
}
});
}
});
}
}
Something like the following should work for Jetty:
public class ApiService {
HttpClient httpClient;
public ApiService(HttpClient httpClient,) {
this.httpClient = httpClient;
}
public <RequestType, ResultType> Observable<ResultType> createApiObservable(final RequestType requestContent) {
return Observable.create(new Observable.OnSubscribe<ResultType>() {
@Override
public void call(final Subscriber<? super ResultType> subscriber) {
// Create the request content for your API. Your logic here...
ContentProvider contentProvider = serializeRequest(requestContent);
httpClient
.newRequest("http://domain.com/path")
.content(contentProvider)
.send(new Response.CompleteListener() {
@Override
void onComplete(Result result) {
// Pass along the error if one occurred.
if (result.isFailed()) {
subscriber.onError(result.getFailure());
return;
}
// Convert the response data to the ResultType. Your logic here...
ResultType resultContent = parseResponse(result.getResponse());
// Send the result to the subscriber.
subscriber.onNext(responseBytes);
subscriber.onCompleted();
}
});
}
});
}
}