我在谷歌作曲家(气流)中有一个 DAG 可以导入:
从气流.contrib.sensors.gcs_sensor 导入 GoogleCloudStorageObjectSensor
当我运行 DAG 时,出现此错误:
“ImportError:没有名为 sensors.base_sensor_operator 的模块”
基本上我想在做其他事情之前检查一个文件是否存在于存储桶中。
这是完整的python代码:
from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.gcs_to_bq import
GoogleCloudStorageToBigQueryOperator
from airflow.contrib.sensors.gcs_sensor import
GoogleCloudStorageObjectSensor
CONNECTION_ID = 'something'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 8, 21, 0, 0),
}
def print_hello():
return 'youtube folder exists!!'
with DAG('DATA_TRANSFER_GCP_BUCKET_TO_BQ2',
schedule_interval=timedelta(days=1),
default_args=default_args ) as dag:
gcp_sensorBucket=GoogleCloudStorageObjectSensor(
task_id='gcp_sensorbucket',
bucket='/aa_youtube_new/2018/06/04/',
#bucket='{{var.value.gcp_youtube_video_bucket}}/2018/06/04/',
object='*.csv',
google_cloud_conn_id=CONNECTION_ID
)
hello_operator = PythonOperator(task_id='hello_task',
python_callable=print_hello)
hello_operator.set_upstream(gcp_sensorBucket)