我正在同时试验 Vert.x 和 Reactive Extensions (RXJava),发现很难编写异步事件。
问题陈述
- 在关闭自动提交的情况下建立数据库连接。
- 将数据批量插入表1
- 更新表2中的状态
- 犯罪
由于 Vert.x 中的大多数操作都是异步的,因此我尝试使用 RxJava 组合/链接它们。到目前为止,这是我想出的。
public Observable<Boolean> insertBulkDataAndUpdateStatus (final List<JsonArray> inputs,final MyObject obj){
System.out.println("Total Rows to be inserted : "+inputs.size());
return getModifiedConnObservable()
.flatMap(conn ->{
System.out.println("Obtained connection .. ");
inputs.forEach(json->{
System.out.println("Inserting a row");
conn.updateWithParamsObservable(sql, json).subscribe();
});
//Checkpoint 1
System.out.println("updating completion status");
obj.complete();
updateStatus(conn,obj).subscribe(result->{
System.out.println("Committing");
commit(conn).subscribe();
});
return Observable.just(true);
});
}
Observable<SQLConnection> getModifiedConnObservable(){
return _jdbc.getConnectionObservable().flatMap(conn->{
return Observable.just(conn).doOnNext(con->con.setAutoCommitObservable(false).subscribe()).doOnUnsubscribe(conn::close);
});
}
虽然这种方法有效,但我确信这不是理想的方法。
- 第 2 点(每条记录的批量插入)和第 3 点最终状态更新没有链接在一起。而且我不确定在批量插入期间是否有任何记录失败会阻止 #3 发生。
来自命令式背景,我发现很难同时掌握 Rx 概念和异步事件。任何帮助深表感谢。