1

如何使用 Groovy 实现并发/并行数据库查询?

我想计算数据库(select count(*) from $TABLE)中所有表中的行数,并将计数写入单独的文件。

有些表有几百万行,这需要几分钟才能计算出来,而许多表没有行。我希望教 Groovy 分离每个计数请求,等待结果,抓取列表中的下一个表,等等。我将此功能添加到现有的可靠且有效的 Groovy 脚本中。

这是我到目前为止所拥有的:

retrieve_table_count_list = { objParams ->

    aryTables = objParams.table_list

    objDB2DBRS = [:]
    objDB2DBRS["database_jdbc_url"] = get_config_setting(setting: "DB2DatabaseURL").toString()

    strReturnSQL = ""

    GParsPool.withPool(10) { pool ->
        currentPool = pool
        aryTables.eachWithIndexParallel { objTable, intTable ->

            intNumTables = aryTables.size()
            strTableName = objTable.table_name

            strSQL="select count(*) as \"row_count\" from ${strTableName}"

            aryRowCount = { db_commands(query: strSQL, params: objDB2DBRS) }
            objFastRowCount = aryRowCount.async()
            objResultRowCount = objFastRowCount()

            intRowCount = aryRowCount[0]["row_count"]

            strReturnSQL += "update tmp_TableCount set row_count=${intRowCount} where table_name='${strTableName}';\n"
        }
    }
    return strReturnSQL
}

`

上面的代码与基本要素配对。原始代码包括用户名和密码(这是唯一的区别)返回此错误:

Retrieving row counts from DB2
TABLE_NAME: select count(*) as "row_count" from TABLE_NAME: Caught: groovy.lang.MissingMethodException: No signature of method: 1500.retrieve-accurate-row-counts-db2-concurrent.async() is applicable for argument types: () values: []
Possible solutions: any(), any(groovy.lang.Closure), asType(java.lang.Class), run(), run(), find()
groovy.lang.MissingMethodException: No signature of method: 1500.retrieve-accurate-row-counts-db2-concurrent.async() is applicable for argument types: () values: []
Possible solutions: any(), any(groovy.lang.Closure), asType(java.lang.Class), run(), run(), find()
        at 1500.retrieve-accurate-row-counts-db2-concurrent$_run_closure2_closure3_closure4.doCall(1500.retrieve-accurate-row-counts-db2-concurrent.groovy:79)
        at groovyx.gpars.pa.GParsPoolUtilHelper$_eachWithIndex_closure9.doCall(GParsPoolUtilHelper.groovy:182)
        at com.sun.proxy.$Proxy6.op(Unknown Source)
        at extra166y.AbstractParallelAnyArray$OOMPap.leafTransfer(AbstractParallelAnyArray.java:2249)
        at extra166y.PAS$FJOMap.atLeaf(PAS.java:228)
        at extra166y.PAS$FJBase.compute(PAS.java:78)
        at jsr166y.RecursiveAction.exec(RecursiveAction.java:148)
        at jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:305)
        at jsr166y.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:575)
        at jsr166y.ForkJoinPool.scan(ForkJoinPool.java:755)
        at jsr166y.ForkJoinPool.work(ForkJoinPool.java:617)
        at jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:369)
4

1 回答 1

0

有几个问题。Jeremie B 的回答有所帮助,但还有其他问题。我不确定该给谁一个答案。欢迎任何意见!

// PROBLEM #1:
// db_commands was calling another closure to decrypt a password,
// but the decryption library was not thread safe
// solved by wrapping decryption in a "waiting loop:"
DECRYPT_AVAILABLE = true
// decrypt string
decrypt = { def objParams ->
    // if we're already decrypting a string, wait a short time, then test again
    while (DECRYPT_AVAILABLE == false) {
        Thread.sleep(100);
    }
    DECRYPT_AVAILABLE = false
    // ... decrypt text ...
    DECRYPT_AVAILABLE = true
    return decryptedText;
}

def count_table_rows = { def objParams ->
    def strTableName = objParams.table_name
    def objMSSQLDBRS = objParams.params
    def intTotal = objParams.total

    def String strSQL
    def boolean bolCountError
    def int intRowCount
    def String strRowCount
    def String strToLog
    def objRowCount
    def aryRowCount
    def strReturnSQL

    strSQL = "select count(*) as \"row_count\" from ${strTableName}"
    objRowCount = db_commands(query: strSQL, params: objMSSQLDBRS)

    if (objRowCount.containsKey("error") && objRowCount["error"] == true) {
        w(string: "ERROR on SQL: " + strSQL, style: "error");
        w(string: objRowCount["error_message"].toString(), style: "error");
        System.exit(0)
    }

    aryRowCount = objRowCount["data"]
    intRowCount = aryRowCount[0]["row_count"]

    strReturnSQL = "update tmp_TableCounts set row_count=${intRowCount} where table_name='${strTableName}';\n"

    return strReturnSQL
}

def count_all_table_rows = {
    def objPGDBRS = [:]
    def objMSSQLDBRS = [:]
    def strReturn
    objPGDBRS["database_jdbc_url"] = get_config_setting(setting: "PGDatabaseURL").toString()
    def strPGSchema = get_config_setting(setting: "PGDatabaseSchema").toString()

    def strSQL="select schemaname, tablename from pg_tables where schemaname='${strPGSchema}' order by tablename"

    // PROBLEM #2:
    // db_commands can't have any variables defined within the closure that are accessible outside the closure
    // (iow, all variables must be locally scoped)

    def objResults = db_commands(query: strSQL, params: objPGDBRS);

    if (objResults["error"] == true) {
        w(string: "ERROR on SQL: " + strSQL, style: "error");
        w(string: objResults["error_message"].toString(), style: "error");
        System.exit(0)
    }

    def aryTables = objResults["data"].table_name
    def strMSSQLSchema

    objMSSQLDBRS["database_jdbc_url"] = get_config_setting(setting: "MSSQLDatabaseURL").toString()

    GParsPool.withPool() {
        aryTables.eachParallel { def strTableName ->
            strReturn = count_table_rows(table_name: strTableName, params: objMSSQLDBRS, total: aryTables.size())
            strCommands += "$strReturn"
        }
    }

}
于 2016-03-03T16:45:45.983 回答