我有这段长时间运行的代码,它将文件从一个 GCS 存储桶中的多个日期前缀复制到另一个并添加了一些dt前缀,并且此代码在 composer/Airflow 中运行了 4 个小时,有时它会失败。我正在寻找并行化/优化它的方法,以使 copy_blob 操作更快,类似于gsutil -m cp -r
使用 Python
我已经从https://stackoverflow.com/a/59008580/8518878将函数带到GCS中的list_directories
这是我的实际代码片段,我在其中调用包装在气流中的上述函数,该函数运行了四个小时并且无法复制 160 个日期目录,每个目录中平均有 50 个文件
default_args = {
'owner': 'airflow',
'start_date': datetime(year=2021, month=11, day=3, hour=1, minute=1, tzinfo=local_tz),
'email_on_failure': True,
'email_on_retry': True,
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=2),
'on_failure_callback' : failure_callback,
}
dag = DAG('test_DAG',
schedule_interval='0 11 * * *',
concurrency=5,
max_active_runs=1,
default_args=default_args)
from google.api_core import page_iterator
from google.cloud import storage
def _item_to_value(iterator, item):
return item
def list_directories(bucket_name, prefix):
if prefix and not prefix.endswith('/'):
prefix += '/'
extra_params = {
"projection": "noAcl",
"prefix": prefix,
"delimiter": '/'
}
gcs = storage.Client()
path = "/b/" + bucket_name + "/o"
iterator = page_iterator.HTTPIterator(
client=gcs,
api_request=gcs._connection.api_request,
path=path,
items_key='prefixes',
item_to_value=_item_to_value,
extra_params=extra_params,
)
return [x for x in iterator]
def move_files(**kwargs):
storage_client = storage.Client()
origin = storage_client.bucket(bucket_name)
destination = storage_client.bucket(target_bucket)
list_dir = list_directories(bucket_name, source_prefix) #Calll to the function
for dir in list_dir:
blobs = storage_client.list_blobs(bucket_name,prefix=dir)
date_prefix_old = dir.split("/")[2]
date_prefix_new = 'dt=' + date_prefix_old
for blob in blobs:
origin.copy_blob(blob, destination,new_name = staging_prefix_ext + str.replace(blob.name, date_prefix_old,date_prefix_new))
return "Done!"
move_files_func = python_operator.PythonOperator(
task_id='move_files',
python_callable=move_files,
#op_kwargs=,
provide_context=True,
dag=dag
)
move_files_func