30

有没有办法通过 ssh 连接到不同的服务器并使用 Airbnb 的 Airflow 运行 BashOperator?我正在尝试使用 Airflow 运行 hive sql 命令,但我需要通过 SSH 连接到另一个盒子才能运行 hive shell。我的任务应该是这样的:

  1. SSH 到服务器 1
  2. 启动 Hive 外壳
  3. 运行 Hive 命令

谢谢!

4

3 回答 3

43

不适用于气流 2.x。

我想我刚刚想通了:

  1. 在 UI 中的 Admin > Connection 下创建 SSH 连接。注意:如果您重置数据库,连接将被删除

  2. 在 Python 文件中添加以下内容

     from airflow.contrib.hooks import SSHHook
     sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
    
  3. 添加 SSH 操作员任务

     t1 = SSHExecuteOperator(
         task_id="task1",
         bash_command=<YOUR COMMAND>,
         ssh_hook=sshHook,
         dag=dag)
    

谢谢!

于 2016-09-14T15:29:01.963 回答
29

安东的回答要注意的一件事是,论点实际上是,而ssh_conn_id不是对象。至少在 1.10 版本中。conn_idSSHOperator

一个简单的例子看起来像

from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date': datetime.now() - timedelta(minutes=20),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG(dag_id='testing_stuff',
          default_args=default_args,
          schedule_interval='0,10,20,30,40,50 * * * *',
          dagrun_timeout=timedelta(seconds=120))
# Step 1 - Dump data from postgres databases
t1_bash = """
echo 'Hello World'
"""
t1 = SSHOperator(
    ssh_conn_id='ssh_default',
    task_id='test_ssh_operator',
    command=t1_bash,
    dag=dag)
于 2018-11-15T21:43:14.350 回答
5

这是 Airflow 2 中 ssh 运算符的工作示例:

[注意:此运算符的输出是 base64 编码的]

from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
sshHook = SSHHook(ssh_conn_id="conn-id", key_file='/opt/airflow/keys/ssh.key')
# a hook can also be defined directly in the code:
# sshHook = SSHHook(remote_host='server.com', username='admin', key_file='/opt/airflow/keys/ssh.key')

ls = SSHOperator(
        task_id="ls",
        command= "ls -l",
        ssh_hook = sshHook,
        dag = dag)

conn-id是 Admin -> Connections 中的一组。是key_file私有 ssh 密钥。

于 2021-06-09T11:15:15.530 回答