我们正在尝试创建文件,检查文件,然后使用 GCSFS 库从气流中删除文件。它可以正常创建和检查文件,但我们无法使用此库删除文件。下面是我们的代码,
from airflow import DAG
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
from datetime import datetime, timedelta
import logging
from airflow.models import taskinstance
import gcsfs
import ast
import time
from google.cloud import storage
PROJECT_ID ='vf-uk-ngbi-dev-gen-01'
BUCKET='gs://vf-uk-ngbi-dev-gen-01/recon/temp/'
ids=str(time.time()).replace('.', 'DM')
path=BUCKET+ids
def accept_ip_cli(ds, **kwargs):
number="nik"
fs = gcsfs.GCSFileSystem(project=PROJECT_ID)
print("Path is")
print(path)
with fs.open(path,'w') as file1:
file1.write(number)
try:
if fs.exists(path):
print("File is present")
else:
print("File is missing")
except:
print("Wrror in exist loop")
try:
info=fs.info(path)
print("Info is")
print(info)
except:
print("issue in info")
try:
fs.rm_file(path)
print("You did it")
except:
print("Opps file is still there")
try:
if fs.exists(path):
print("File is present")
else:
print("File is missing")
except:
print("Wrror in exist loop")
# Function to create dags for the number of executions
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': days_ago(0),
'retries': 1,
'retry_delay': timedelta(minutes=2)
}
dag = models.DAG(
'lets_do_it_play_with_gcsfs',
schedule_interval=None,
start_date=days_ago(0),
default_args=default_args)
user_ip = PythonOperator(task_id = "read_ip",
provide_context = True,
python_callable = accept_ip_cli,
dag = dag)
(
user_ip
)
当我们执行此代码时,删除部分将输出为“Opps 文件仍然存在”。我在这里想念什么?