场景: - 我正在尝试将一个大型数据集从 Oracle DB(几百万条记录)导入 ElasticSearch。- 我可以使用 logstash jdbc 输入插件导入数据。
问题: - 需要很长时间(5 小时)才能完成。
- 有没有更好的解决方案来减少这个时间?在这种情况下的良好做法?
场景: - 我正在尝试将一个大型数据集从 Oracle DB(几百万条记录)导入 ElasticSearch。- 我可以使用 logstash jdbc 输入插件导入数据。
问题: - 需要很长时间(5 小时)才能完成。
您可以先玩jdbc_fetch_size
,下一步将在多个节点上运行导入,对于每个节点,将 sql 查询拆分为不重叠的集合,例如select * from XX where id between 0 and 1000000
.
您还可以在索引大批量之前将 ES 索引副本数设置为 0,然后再将其更改回来。当然,使用具有良好批量大小和并行性的批量索引 API。
您可以使用
:sql_last_value
使用调度程序以增量方式从数据库获取数据的选项。像下面这样的东西会有所帮助。(假设您有一个 ID 字段)
input {
jdbc {
jdbc_connection_string => "jdbc:oracle:thin:IP:PORT:SID"
jdbc_user => "user"
jdbc_password => "password"
jdbc_driver_library => ".......\ojdbc8.jar"
jdbc_driver_class => "Java::oracle.jdbc.OracleDriver"
statement => "select * from (select * from TABLE where id >= :sql_last_value ORDER BY id ASC) rownum < 20"
use_column_value => true
tracking_column => id
tracking_column_type => "numeric"
schedule => "* * * * *"
}
}
更新:重构 SQL 以使用rownum
并ORDER BY
以预期顺序限制结果(限制前排序)。请参阅:关于 ROWNUM 和限制结果
使用分页可以实现相同的结果,但它存在性能问题。如果我们使用分页 ex:
input {
jdbc {
jdbc_connection_string => "jdbc:oracle:thin:IP:PORT:SID"
jdbc_user => "user"
jdbc_password => "password"
jdbc_driver_library => ".......\ojdbc8.jar"
jdbc_driver_class => "Java::oracle.jdbc.OracleDriver"
jdbc_page_size => 20
jdbc_paging_enabled => true
statement => "select * from TABLE"
}
}
但这会包装
“从表中选择 *”
喜欢
SELECT * FROM (SELECT "T1".*, ROWNUM "X_SEQUEL_ROW_NUMBER_X" FROM (SELECT * FROM (select * from TABLE) "T1") "T1") "T1" WHERE (("X_SEQUEL_ROW_NUMBER_X" > 0) AND ("X_SEQUEL_ROW_NUMBER_X " <= (0 + 20)));
并且将通过根据指定的 jdbc_page_size (本例中为 20)划分查询来在没有调度程序的情况下运行。但是这种方法显然存在性能问题。
您可以在同一个配置文件中放置多个 jdbc 输入插件。它对我来说很好用。
IE:
输入 { jdbc { ... }
jdbc {
...
}
}