0

我们正在尝试创建文件,检查文件,然后使用 GCSFS 库从气流中删除文件。它可以正常创建和检查文件,但我们无法使用此库删除文件。下面是我们的代码,

from airflow import DAG
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from datetime import datetime, timedelta
import logging
from airflow.models import taskinstance
import gcsfs
import ast
import time
from google.cloud import storage 

PROJECT_ID        ='vf-uk-ngbi-dev-gen-01'
BUCKET='gs://vf-uk-ngbi-dev-gen-01/recon/temp/'
ids=str(time.time()).replace('.', 'DM')
path=BUCKET+ids

def accept_ip_cli(ds, **kwargs):
    number="nik"
    fs = gcsfs.GCSFileSystem(project=PROJECT_ID)
    print("Path is")
    print(path)
    with fs.open(path,'w') as file1:
        file1.write(number)
        
    try:
        if fs.exists(path):
            print("File is present")
        else:
            print("File is missing")
    except:
        print("Wrror in exist loop")
        
    try:
        info=fs.info(path)
        print("Info is")
        print(info)
    except:
        print("issue in info")
    
    try:
        fs.rm_file(path)
        print("You did it")
    except:
        print("Opps file is still there")
        
    try:
        if fs.exists(path):
            print("File is present")
        else:
            print("File is missing")
    except:
        print("Wrror in exist loop")        

# Function to create dags for the number of executions

default_args = {
        'owner': 'Airflow',
        'depends_on_past': False,
        'start_date': days_ago(0),
        'retries': 1,
        'retry_delay': timedelta(minutes=2)
    }

dag = models.DAG(
'lets_do_it_play_with_gcsfs',
schedule_interval=None,
start_date=days_ago(0),
default_args=default_args)

user_ip =  PythonOperator(task_id = "read_ip",
        provide_context = True,
        python_callable = accept_ip_cli,
        dag = dag)
(
user_ip
)

当我们执行此代码时,删除部分将输出为“Opps 文件仍然存在”。我在这里想念什么?

4

0 回答 0