6

我对气流很陌生,并尝试使用 apache 气流与谷歌 pubsub 的集成,我猜它是在“Airflow-300”JIRA 下添加的。如果我在这里阅读不正确,请纠正我。

另外,能否请您告知这是否已发布或何时发布?我们正在考虑在 Google Cloud Storage 上添加通知,在发生任何文件事件时,我们希望在 Airflow 中触发一些工作流。

我似乎找不到任何关于如何使用它的文档。

任何建议将不胜感激。

4

2 回答 2

5

已经引入了 Airflow 中的集成。

发布消息

from base64 import b64encode as b64e

m1 = {'data': b64e('Hello, World!'),
       'attributes': {'type': 'greeting'}
      }
m2 = {'data': b64e('Knock, knock')}
m3 = {'attributes': {'foo': ''}}

t1 = PubSubPublishOperator(
    topic='my_topic',
    messages=[m1, m2, m3],
    create_topic=True,
    dag=dag)

接收消息

PubSubPullSensor(
    task_id='pub_sub_wait', 
    project='my_project',
    subscription='my-subscription',
    ack_messages=True)

参考:

https://github.com/apache/incubator-airflow/commit/d231dce37d753ed196a26d9b244ddf376385de38 https://github.com/apache/incubator-airflow/commit/6645218092096e4b10fc737a62bacc2670e1d6dc

于 2018-12-26T03:44:45.323 回答
1

添加到@user1849502 的答案,您还可以使用 PubSubHook:

PubSubHook().publish(project, topic, message)

PubSubHook().pull(project, subscription, max_messages, return_immediately)

参考https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/hooks/gcp_pubsub_hook.html

于 2019-01-07T10:45:09.310 回答