1

I'm trying to build a Java application that can stream very large result sets of arbitrary SQL SELECT queries into JSONL files, specifically through SQLServer but would like to run with any JDBC DataSource. In Python this would be easy to just treat the sql client result as a generator and then call json.dumps(). However, in this code it seems like its putting everything in memory before writing out, typically causing heap and garbage collection exceptions. The queries I need this to run for are very large, bringing back up to 10GB of raw data. Execution time is not the primary concern, as long as it works every time.

I've tried calling flush after ever row (which is ridiculous) and that seems to help with small datasets but not with large ones. Can anyone suggest a strategy I can use to pull this off easily?

In my SQL client class I use Apache DbUtils QueryRunner and MapListHandler to create a list of Maps which is the flexibility I need (versus more traditional approaches in Java which require specifying schema and types):

public List<Map<String, Object>> query(String queryText) {
    try {
        DbUtils.loadDriver("com.microsoft.sqlserver.jdbc.Driver");

        // this function just sets up all the connection properties. Ommitted for clarity
        DataSource ds = this.initDataSource();

        StatementConfiguration sc = new StatementConfiguration.Builder().fetchSize(10000).build();
        QueryRunner queryRunner = new QueryRunner(ds, sc);
        MapListHandler handler = new MapListHandler();
        return queryRunner.query(queryText, handler);
    } catch (Exception e) {
        logger.error(e.getMessage());
        e.printStackTrace();
        return null;
    }
}

JsonLOutputWriter class:

JsonLOutputWriter(String filename) {
    GsonBuilder gsonBuilder = new GsonBuilder();
    gsonBuilder.serializeNulls();
    this.gson = gsonBuilder.create();
    try {
        this.writer = new PrintWriter(new File(filename), ENCODING);
    } catch (FileNotFoundException | UnsupportedEncodingException e) {
        e.printStackTrace();
    }
}

void writeRow(Map row) {
    this.writer.println(this.gson.toJson(row));
}

void flush() {
    this.writer.flush();
}

Main method:

JsonLOutputWriter writer = new JsonLOutputWriter(outputFile)
for (Map row : client.query(inputSql)) {
    writer.writeRow(row);
}
writer.flush()
4

1 回答 1

2

基本上这不能在DbUtils开箱即用的情况下完成。我摆脱了QueryRunner并且MapListHandler因为处理程序创建了一个ArrayList. 我不是基于拉,而是基于推,创建了一个非常相似的方法MyQueryRunner,它需要一个MyRowHandler而不是返回一个集合,而是迭代ResultSet并调用我的输出函数。

我确信有更优雅的方法可以做到这一点并返回某种行缓冲区,但这是我需要的 80/20 并且适用于大型数据集。

行处理程序

public class RowHandler {
    private static final RowProcessor ROW_PROCESSOR = new BasicRowProcessor();
    private JsonLOutputWriter writer;

    public RowHandler(JsonLOutputWriter writer) {
        this.writer = writer;
    }

    int handle(ResultSet rs) throws SQLException {
        AtomicInteger counter = new AtomicInteger();
        while (rs.next()) {
            writer.writeRow(this.handleRow(rs));
            counter.getAndIncrement();
        }
        return counter.intValue();
    }

    protected Map<String, Object> handleRow(ResultSet rs) throws SQLException {
        return this.ROW_PROCESSOR.toMap(rs);
    }

}

查询处理程序

class CustomQueryRunner extends AbstractQueryRunner {

    private final RowHandler rh;

    CustomQueryRunner(DataSource ds, StatementConfiguration stmtConfig, RowHandler rh) {
        super(ds, stmtConfig);
        this.rh = rh;
    }

    int query(String sql) throws SQLException {
        Connection conn = this.prepareConnection();
        return this.query(conn, true, sql);
    }

    private int query(Connection conn, boolean closeConn, String sql, Object... params)
            throws SQLException {
        if (conn == null) {
            throw new SQLException("Null connection");
        }
        PreparedStatement stmt = null;
        ResultSet rs = null;
        int count = 0;
        try {
            stmt = this.prepareStatement(conn, sql);
            this.fillStatement(stmt, params);
            rs = this.wrap(stmt.executeQuery());
            count = rh.handle(rs);
        } catch (SQLException e) {
            this.rethrow(e, sql, params);
        } finally {
            try {
                close(rs);
            } finally {
                close(stmt);
                if (closeConn) {
                    close(conn);
                }
            }
        }
        return count;
    }
}
于 2019-01-19T17:08:20.980 回答