59

我一直在尝试使用 Airflow 来安排 DAG。其中一个 DAG 包括从 s3 存储桶加载数据的任务。

出于上述目的,我需要设置 s3 连接。但是气流提供的用户界面并不是那么直观(http://pythonhosted.org/airflow/configuration.html?highlight=connection#connections)。如果是这样的话,任何人都成功地建立了 s3 连接,你们有没有遵循的最佳实践?

谢谢。

4

8 回答 8

99

编辑:此答案以纯文本形式存储您的密钥,这可能存在安全风险,不推荐使用。最好的方法是将访问密钥和密钥放在登录/密码字段中,如下面的其他答案所述。结束编辑

很难找到参考资料,但经过一番挖掘后,我能够让它发挥作用。

TLDR

创建具有以下属性的新连接:

连接 ID: my_conn_S3

连接类型: S3

额外的:

{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}

长版,设置 UI 连接:

  • 在 Airflow UI 上,转到管理 > 连接
  • 创建具有以下属性的新连接:
  • 康恩编号:my_conn_S3
  • 连接类型:S3
  • 额外的:{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
  • 将所有其他字段(主机、架构、登录)留空。

要使用此连接,您可以在下面找到一个简单的 S3 传感器测试。这个测试的想法是设置一个传感器来监视 S3 中的文件(T1 任务),一旦满足以下条件,它就会触发 bash 命令(T2 任务)。

测试

  • 在运行 DAG 之前,请确保您有一个名为“S3-Bucket-To-Watch”的 S3 存储桶。
  • 将 s3_dag_test.py 下面添加到气流 dags 文件夹 (~/airflow/dags)
  • 开始airflow webserver
  • 转到气流用户界面 (http://localhost:8383/)
  • 开始airflow scheduler
  • 在 DAG 主视图上打开“s3_dag_test”DAG。
  • 选择“s3_dag_test”以显示 dag 详细信息。
  • 在图表视图上,您​​应该能够看到它的当前状态。
  • 'check_s3_for_file_in_s3' 任务应该处于活动状态并正在运行。
  • 现在,将名为“file-to-watch-1”的文件添加到“S3-Bucket-To-Watch”。
  • 第一个任务应该已经完成​​,第二个应该开始并完成。

dag 定义中的 schedule_interval 设置为 '@once',以便于调试。

要再次运行它,请保持原样,删除存储桶中的文件,然后通过选择第一个任务(在图表视图中)并选择“清除”所有“过去”、“未来”、“上游”、“下游”重试.... 活动。这应该会再次启动 DAG。

让我知道进展如何。

s3_dag_test.py ;

"""
S3 Sensor Connection Test
"""

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor,   BashOperator, EmailOperator, S3KeySensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 11, 1),
    'email': ['something@here.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('s3_dag_test', default_args=default_args, schedule_interval= '@once')

t1 = BashOperator(
    task_id='bash_test',
    bash_command='echo "hello, it should work" > s3_conn_test.txt',
    dag=dag)

sensor = S3KeySensor(
    task_id='check_s3_for_file_in_s3',
    bucket_key='file-to-watch-*',
    wildcard_match=True,
    bucket_name='S3-Bucket-To-Watch',
    s3_conn_id='my_conn_S3',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag)

t1.set_upstream(sensor)

主要参考资料:
于 2016-11-23T21:22:56.513 回答
21

假设气流托管在 EC2 服务器上。

只需根据其他答案创建连接,但将配置中的所有内容都留空,除了应保留为 S3 的连接类型

S3hook 将默认为 boto,这将默认为您正在运行气流的 EC2 服务器的角色。假设此角色拥有 S3 的权限,您的任务将能够访问存储桶。

这比使用和存储凭据更安全。

于 2017-09-05T05:16:20.117 回答
17

如果您担心在 UI 中暴露凭据,另一种方法是在 UI 的 Extra 参数中传递凭据文件位置。只有功能用户具有文件的读取权限。它看起来像下面

Extra:  {
    "profile": "<profile_name>", 
    "s3_config_file": "/home/<functional_user>/creds/s3_credentials", 
    "s3_config_format": "aws" }

文件“ /home/<functional_user>/creds/s3_credentials”有以下条目

[<profile_name>]
aws_access_key_id = <access_key_id>
aws_secret_access_key = <secret_key>
于 2017-02-01T16:43:01.557 回答
11

另一个对我有用的选择是将访问密钥作为“登录名”,将密钥作为“密码”:

Conn Id: <arbitrary_conn_id>
Conn Type: S3
Login: <aws_access_key>
Password: <aws_secret_key>

将所有其他字段留空。

于 2018-09-22T14:53:23.940 回答
7

我们在几个版本前已将此添加到我们的文档中:

http://airflow.apache.org/docs/stable/howto/connection/aws.html

AWS 连接和 S3 连接之间没有区别。

此处接受的答案在 extra/JSON 中有密钥和秘密,虽然它仍然有效(从 1.10.10 开始),但不再推荐它,因为它在 UI 中以纯文本形式显示秘密。

于 2020-05-24T17:29:39.003 回答
5

对于新版本,更改上述示例中的 python 代码。

s3_conn_id='my_conn_S3'

aws_conn_id='my_conn_s3'
于 2018-02-15T06:05:41.433 回答
1
Conn Id: example_s3_connnection
Conn Type: S3
Extra:{"aws_access_key_id":"xxxxxxxxxx", "aws_secret_access_key": "yyyyyyyyyyy"}

注意:登录和密码字段留空。

于 2018-11-12T15:13:15.933 回答
0

对于中国的aws,它不适用于airflow==1.8.0 需要更新到1.9.0 但airflow 1.9.0 将名称更改为apache-airflow==1.9.0

于 2018-03-06T06:07:28.970 回答