我使用以下代码通过 Airflow (MWAA) 将小表从数据库 A 传输到数据库 B:
def move_data(sql_file_name, target_tbl_name, target_schema_name):
select_stmt = ""
dir_path = os.path.dirname(os.path.realpath(__file__))
with open(dir_path +'/' + sql_file_name, 'r') as file:
select_stmt = file.read().replace('%', '%%')
src = PostgresHook(postgres_conn_id="A")
src_engine = src.get_sqlalchemy_engine().connect()
dest = PostgresHook(postgres_conn_id="B")
dest_engine = dest.get_sqlalchemy_engine().connect()
for chunk in pd.read_sql(select_stmt, src_engine, chunksize=30000):
print('rows = {0}, columns = {1}'.format(chunk.shape[0], chunk.shape[1]))
try:
chunk.to_sql(name=target_tbl_name, con=dest_engine,
schema=target_schema_name, chunksize=30000,
if_exists='replace', index=False, method='multi')
except Exception as e:
print(e)
dest_engine.execute('commit;')
dest_engine.close()
然而,代码只循环一次并且不传输任何记录,只传输目标数据库中表的模式。该表有大约 50000 条记录,但调整块大小无济于事。日志中没有错误。
该代码在 Jupyter notebook 中执行时运行良好,无需使用 Airflow Hooks。
有什么建议可能是什么问题?