我已经配置了气流并创建了一些调用多个运算符的 Dags 和 subDags。
我的麻烦是,当操作员运行并完成工作时,我想以某种 python 结构接收结果。例如:
文件1.py
...
...
sub_dag_one=SubDagOperator(subdag=subdag_accessHive(
PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, STEP, macros,path,
),
task_id=DELP_DAG_NAME,
dag=dag,
)
文件2.py
from airflow import DAG
from airflow.operators import HiveOperator
def subdag_callHive(parent, child, args, step,
user_defined_macros, path
):
dag_subdag = DAG(
dag_id='%s.%s' % (parent, child),
default_args=args,
schedule_interval="@daily",
template_searchpath=path,
user_defined_macros=user_defined_macros,
)
# some work...
HiveOperator(
task_id='some_id',
hiveconf_jinja_translate=True,
hql='select field1 from public.mytable limit 4;',
trigger_rule='all_done',
dag=dag_subdag,
)
return dag_subdag
函数subdag_callHive从另一个 python 脚本调用,其中定义了主 Dag 和所有其他所需的参数。
我只需要能够从 HiveOperator (*select * from public.mytable limit 4;*) 获得结果,在这种情况下为 4 个值。
返回的 dag_subdag 是一个对象< class 'airflow.models.DAG' >并包含传递给调用的所有属性/数据,但没有关于 HiveOperator 做了什么的信息。
这可能吗?如果是这样,如何实现。