0

我的任务代码如下。

from airflow.models import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
rootdir = "/tmp/airflow"
default_args = {
    'owner': 'max',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'email': ['max@test.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG('test3', default_args=default_args,
                    schedule_interval='*/2 * * * *')
t1 = BashOperator(
    task_id='test3-task1',
    bash_command='date  >> {rootdir}/test3-task1.out'.format(rootdir=rootdir),
    owner='max',
    dag=dag)
t2 = BashOperator(
    task_id='test3-task2',
    bash_command='whoami',
    retries=3,
    owner='max',
    dag=dag)

然后我用linux的'airflow'用户运行命令“airflow test test3 test3-task2 2016-07-25”。输出“whoami”的结果是“气流”。但我希望输出结果是任务的“所有者”。

我怎么了?

谢谢

以下是输出结果。

[2016-07-25 11:22:37,716] {bash_operator.py:64} INFO - 临时脚本位置:/tmp/airflowtmpoYNJE8//tmp/airflowtmpoYNJE8/test3-task2U1lpom

[2016-07-25 11:22:37,716] {bash_operator.py:65} INFO - 运行命令:whoami

[2016-07-25 11:22:37,722] {bash_operator.py:73} 信息 - 输出:

[2016-07-25 11:22:37,725] {bash_operator.py:77} 信息 -气流

[2016-07-25 11:22:37,725] {bash_operator.py:80} INFO - 命令以返回码 0 退出

4

2 回答 2

1

您可以使用 default_args 下的“run_as_user”参数

default_args = {
    'owner': 'max',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'email': ['max@test.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'run_as_user': 'max'
}
于 2018-02-09T23:43:06.173 回答
0

看起来您尝试执行的操作不受支持。查看bash_operatorBaseOperator的源代码,不幸的是,在执行任务之前都没有尝试更改用户。

于 2016-10-19T20:33:02.277 回答