0

我有这段长时间运行的代码,它将文件从一个 GCS 存储桶中的多个日期前缀复制到另一个并添加了一些dt前缀,并且此代码在 composer/Airflow 中运行了 4 个小时,有时它会失败。我正在寻找并行化/优化它的方法,以使 copy_blob 操作更快,类似于gsutil -m cp -r使用 Python

我已经从https://stackoverflow.com/a/59008580/8518878将函数带到GCS中的list_directories

这是我的实际代码片段,我在其中调用包装在气流中的上述函数,该函数运行了四个小时并且无法复制 160 个日期目录,每个目录中平均有 50 个文件

default_args = {
    'owner': 'airflow',
    'start_date': datetime(year=2021, month=11, day=3, hour=1, minute=1, tzinfo=local_tz),
    'email_on_failure': True,
    'email_on_retry': True,
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
    'on_failure_callback' : failure_callback,
}


dag = DAG('test_DAG',
          schedule_interval='0 11 * * *',
          concurrency=5,
          max_active_runs=1,
          default_args=default_args)

from google.api_core import page_iterator
from google.cloud import storage

def _item_to_value(iterator, item):
    return item

def list_directories(bucket_name, prefix):
    if prefix and not prefix.endswith('/'):
        prefix += '/'

    extra_params = {
        "projection": "noAcl",
        "prefix": prefix,
        "delimiter": '/'
    }

    gcs = storage.Client()

    path = "/b/" + bucket_name + "/o"

    iterator = page_iterator.HTTPIterator(
        client=gcs,
        api_request=gcs._connection.api_request,
        path=path,
        items_key='prefixes',
        item_to_value=_item_to_value,
        extra_params=extra_params,
    )

    return [x for x in iterator]

def move_files(**kwargs):

    storage_client = storage.Client()
    origin = storage_client.bucket(bucket_name)
    destination = storage_client.bucket(target_bucket)
    list_dir = list_directories(bucket_name, source_prefix) #Calll to the function 

    for dir in list_dir:
        blobs = storage_client.list_blobs(bucket_name,prefix=dir)
        date_prefix_old = dir.split("/")[2]
        date_prefix_new = 'dt=' + date_prefix_old

        for blob in blobs:
            origin.copy_blob(blob, destination,new_name = staging_prefix_ext + str.replace(blob.name, date_prefix_old,date_prefix_new))

    return "Done!"   
        

move_files_func = python_operator.PythonOperator(
    task_id='move_files',
    python_callable=move_files,
    #op_kwargs=,
    provide_context=True,
    dag=dag
)

move_files_func
4

0 回答 0