我们正在使用Cloud Composer(GCP 中的托管气流)来编排我们的任务。我们正在将所有日志移至相扑逻辑(我们组织中的标准流程)。我们的要求是跟踪 DAG 单次执行的整个日志,目前似乎没有办法跟踪。
目前,DAG 中的第一个任务会生成一个唯一的 ID,并通过xcom将其传递给其他任务。这里的问题是我们无法在Airflow 操作员日志(如 BigQueryOperator)中注入唯一 ID。
有没有其他方法可以在 Airflow 操作员日志中注入自定义唯一 ID?
我们正在使用Cloud Composer(GCP 中的托管气流)来编排我们的任务。我们正在将所有日志移至相扑逻辑(我们组织中的标准流程)。我们的要求是跟踪 DAG 单次执行的整个日志,目前似乎没有办法跟踪。
目前,DAG 中的第一个任务会生成一个唯一的 ID,并通过xcom将其传递给其他任务。这里的问题是我们无法在Airflow 操作员日志(如 BigQueryOperator)中注入唯一 ID。
有没有其他方法可以在 Airflow 操作员日志中注入自定义唯一 ID?
Composer与 stackdriver 日志集成,您可以通过“workflow:{your-dag-name}”和“execution-date:{your-dag-run-date}”过滤每个 DAG 日志,例如,
您可以使用以下过滤器读取日志条目:
resource.type="cloud_composer_environment"
resource.labels.location="your-location"
resource.labels.environment_name="your-environment-name"
logName="projects/cloud-airflow-dev/logs/airflow-worker"
labels."execution-date"="your-dag-run-date"
labels.workflow="your-dag-id"