我们目前正在探索如何使用 Apache Beam/Google Dataflow 处理 Google Cloud SQL 数据库 (MySQL) 中的大量数据存储。
该数据库在一个表中存储了大约 200GB 的数据。
我们使用 成功地从数据库中读取了行JdbcIO
,但到目前为止,这只有在我们LIMIT
查询的行数时才有可能。否则我们会遇到内存问题。我假设默认情况下,SELECT
查询会尝试将所有结果行加载到内存中。
什么是惯用的方法?批处理 SQL 查询?流式传输结果?
我们尝试调整fetch size
执行的语句,但没有多大成功。
这是我们的 JDBC 读取设置的样子:
JdbcReadOptions(
connectionOptions = connOpts,
query = "SELECT data FROM raw_data",
statementPreparator = statement => statement.setFetchSize(100),
rowMapper = result => result.getString(1)
)
到目前为止,我还没有找到任何关于来自 sql 的流的资源。
编辑
我将列出我采用的视图方法,以便其他人可以学到一些东西(例如如何不这样做)。为了了解更多上下文,所讨论的数据库表的结构非常糟糕:它有一个包含 JSON 字符串的列,以及一个id
列(主键)加上一个added
和modified
列(两种TIMESTAMP
类型)。在第一种方法时,它没有进一步的索引。该表包含 25 个 mio 行。所以这可能更像是一个数据库问题,而不是 Apache Beam/JDBC 问题。但尽管如此:
方法 1(上) - 查询所有内容
基本上它看起来像这样:
val readOptions = JdbcReadOptions(
connectionOptions = connOpts,
query = "SELECT data FROM raw_data",
rowMapper = result => result.getString(1)
)
context
.jdbcSelect(readOptions)
.map(/*...*/)
如果我将 a 添加LIMIT
到查询中,这将起作用。但显然很慢。
方法 2 - 键集分页
val queries = List(
"SELECT data from raw_data LIMIT 5000 OFFSET 0",
"SELECT data from raw_data LIMIT 5000 OFFSET 5000",
"SELECT data from raw_data LIMIT 5000 OFFSET 10000"
// ...
)
context
.parallelize(queries)
.map(query => {
val connection = DriverManager.getConnection(/* */)
val statement = connection.prepareStatement(query)
val result = statement.executeQuery()
makeIterable(result) // <-- creates a Iterator[String]
})
.flatten
.map(/* processing */)
虽然我很快了解到LIMIT _ OFFSET _
组合也从第一行开始扫描,但这效果更好一些。因此,每个后续查询都花费了更长的时间,收敛到很长时间。
方法 2.5 - 带排序的键集分页
与上述方法类似,但我们在added
列上创建了一个索引并将查询更新为
SELECT data FROM raw_data ORDER BY added LIMIT 5000 OFFSET x
这加快了速度,但最终查询时间变长了。
方法 3 - 无波束/数据流
val connection = DriverManager.getConnection(/* */)
val statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(Integer.MIN_VALUE)
val rs = statement.executeQuery("SELECT data FROM raw_data")
while(rs.next()) {
writer writeLine rs.getString(1)
}
这会将结果集逐行返回并将行写入文件。所有 25 条 mio 记录运行了大约 2 小时。最后。如果有人能指出如何使用 Beam 实现此解决方案,那就太好了。
顺便说一句:现在我有了原始数据作为用 Beam 处理的 CSV 文件是轻而易举的事。大约 80GB 的原始数据可以在大约 5 分钟内通过自动缩放等转换为另一种 CSV 格式。