我对气流很陌生,并尝试使用 apache 气流与谷歌 pubsub 的集成,我猜它是在“Airflow-300”JIRA 下添加的。如果我在这里阅读不正确,请纠正我。
另外,能否请您告知这是否已发布或何时发布?我们正在考虑在 Google Cloud Storage 上添加通知,在发生任何文件事件时,我们希望在 Airflow 中触发一些工作流。
我似乎找不到任何关于如何使用它的文档。
任何建议将不胜感激。
我对气流很陌生,并尝试使用 apache 气流与谷歌 pubsub 的集成,我猜它是在“Airflow-300”JIRA 下添加的。如果我在这里阅读不正确,请纠正我。
另外,能否请您告知这是否已发布或何时发布?我们正在考虑在 Google Cloud Storage 上添加通知,在发生任何文件事件时,我们希望在 Airflow 中触发一些工作流。
我似乎找不到任何关于如何使用它的文档。
任何建议将不胜感激。
已经引入了 Airflow 中的集成。
from base64 import b64encode as b64e
m1 = {'data': b64e('Hello, World!'),
'attributes': {'type': 'greeting'}
}
m2 = {'data': b64e('Knock, knock')}
m3 = {'attributes': {'foo': ''}}
t1 = PubSubPublishOperator(
topic='my_topic',
messages=[m1, m2, m3],
create_topic=True,
dag=dag)
PubSubPullSensor(
task_id='pub_sub_wait',
project='my_project',
subscription='my-subscription',
ack_messages=True)
参考:
https://github.com/apache/incubator-airflow/commit/d231dce37d753ed196a26d9b244ddf376385de38 https://github.com/apache/incubator-airflow/commit/6645218092096e4b10fc737a62bacc2670e1d6dc
添加到@user1849502 的答案,您还可以使用 PubSubHook:
PubSubHook().publish(project, topic, message)
PubSubHook().pull(project, subscription, max_messages, return_immediately)
参考https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/hooks/gcp_pubsub_hook.html