1

我正在同时试验 Vert.x 和 Reactive Extensions (RXJava),发现很难编写异步事件。

问题陈述

  1. 在关闭自动提交的情况下建立数据库连接。
  2. 将数据批量插入表1
  3. 更新表2中的状态
  4. 犯罪

由于 Vert.x 中的大多数操作都是异步的,因此我尝试使用 RxJava 组合/链接它们。到目前为止,这是我想出的。

public Observable<Boolean> insertBulkDataAndUpdateStatus (final List<JsonArray> inputs,final MyObject obj){

    System.out.println("Total Rows to be inserted : "+inputs.size());
    return getModifiedConnObservable()
            .flatMap(conn ->{

                System.out.println("Obtained connection .. ");

                inputs.forEach(json->{
                    System.out.println("Inserting a row");

                    conn.updateWithParamsObservable(sql, json).subscribe();
                });
                //Checkpoint 1
                System.out.println("updating completion status");
                obj.complete();
                updateStatus(conn,obj).subscribe(result->{
                    System.out.println("Committing");
                    commit(conn).subscribe();
                });

                return Observable.just(true);
            });
}

Observable<SQLConnection> getModifiedConnObservable(){
    return _jdbc.getConnectionObservable().flatMap(conn->{
        return  Observable.just(conn).doOnNext(con->con.setAutoCommitObservable(false).subscribe()).doOnUnsubscribe(conn::close);
    });
}

虽然这种方法有效,但我确信这不是理想的方法。

  • 第 2 点(每条记录的批量插入)和第 3 点最终状态更新没有链接在一起。而且我不确定在批量插入期间是否有任何记录失败会阻止 #3 发生。

来自命令式背景,我发现很难同时掌握 Rx 概念和异步事件。任何帮助深表感谢。

4

0 回答 0