我最近加入了一家新公司,并且不熟悉 python(他们首选的脚本语言),并且一直在使用 cx_oracle 创建一些 ETL 流程。到目前为止,我构建的脚本都是单线程作业,它们从 Oracle 源数据库中选择我需要的列子集,并将输出写入命名管道,其中外部进程正在等待读取该数据并将其插入目标.
这一直很好,直到我得到一些在 5 亿 -20 亿行范围内的表。这项工作仍然有效,但需要很多小时才能完成。这些大型源表是分区的,所以我一直在尝试研究协调不同分区的并行读取的方法,这样我就可以让两个或多个线程同时工作,每个线程都写入一个单独的命名管道。
cx-oracle 中是否有一种优雅的方式来处理从同一个表的不同分区读取的多个线程?
这是我当前的(简单)代码:
import cx_Oracle
import csv
# connect via SQL*Net string or by each segment in a separate argument
connection = cx_Oracle.connect("user/password@TNS")
csv.register_dialect('pipe_delimited', escapechar='\\' delimiter='|',quoting=csv.QUOTE_NONE)
cursor = connection.cursor()
f = open("<path_to_named_pipe>", "w")
writer = csv.writer(f, dialect='pipe_delimited', lineterminator="\n")
r = cursor.execute("""SELECT <column_list> from <SOURCE_TABLE>""")
for row in cursor:
writer.writerow(row)
f.close()
我的一些源表有超过 1000 个分区,因此硬编码分区名称不是首选选项。我一直在考虑设置分区名称数组并遍历它们,但如果人们有其他想法,我很乐意听到它们。