1

我在谷歌作曲家(气流)中有一个 DAG 可以导入:

从气流.contrib.sensors.gcs_sensor 导入 GoogleCloudStorageObjectSensor

当我运行 DAG 时,出现此错误:

“ImportError:没有名为 sensors.base_sensor_operator 的模块”

基本上我想在做其他事情之前检查一个文件是否存在于存储桶中。

这是完整的python代码:

from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.gcs_to_bq import 
GoogleCloudStorageToBigQueryOperator
from airflow.contrib.sensors.gcs_sensor import 
GoogleCloudStorageObjectSensor


CONNECTION_ID = 'something'

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 8, 21, 0, 0),
}

def print_hello():
return 'youtube folder exists!!'

with DAG('DATA_TRANSFER_GCP_BUCKET_TO_BQ2', 
schedule_interval=timedelta(days=1), 
 default_args=default_args ) as dag: 

 gcp_sensorBucket=GoogleCloudStorageObjectSensor(
 task_id='gcp_sensorbucket',
 bucket='/aa_youtube_new/2018/06/04/', 
 #bucket='{{var.value.gcp_youtube_video_bucket}}/2018/06/04/',
 object='*.csv', 
 google_cloud_conn_id=CONNECTION_ID

)
hello_operator = PythonOperator(task_id='hello_task', 
python_callable=print_hello)
hello_operator.set_upstream(gcp_sensorBucket)
4

2 回答 2

1

I see that the import from base_sensor_operator was added in the version 1.10 of Airflow. This was not present in versions 1.8 and 1.9. Instead, the import was done as follows:

from airflow.operators.sensors import BaseSensorOperator

So, checking the sensors, the related sensor for Airflow 1.10 is under sensors but for Airflow 1.9 and 1.8, the sensor is under operators.

So, this issue seems to be related to the versioning of Composer and Airflow, but Airflow 1.10 wasn't available for Composer on August. In fact, this issue was reported on Google's Issue Tracker and the response is to solve this by using Apache Airflow version 1.10, which can be included on the version 1.3 of Composer.

于 2018-11-02T00:07:28.563 回答
0

您使用的是哪个 Composer 版本?我将您的 DAG 代码复制并粘贴到新创建的 Composer 环境composer-1.1.0-airflow-1.9.0中,它对我来说是开箱即用的。

这是屏幕截图: 在此处输入图像描述

于 2018-08-25T18:31:06.080 回答