3

我有一个 DAG,它从 Elasticsearch 中获取数据并摄取到数据湖中。第一个任务BeginIngestion在多个任务中打开(每个资源一个),这些任务在更多任务中打开(每个分片一个)。获取分片后,将数据上传到 S3,然后关闭到任务EndIngestion,然后是任务AuditIngestion

它执行正确,但现在所有任务都已成功执行,但“关闭任务” EndIngestion仍然没有状态。当我刷新网络服务器的页面时,DAG 被标记为Failed

在此处输入图像描述 此图显示成功的上游任务,任务end_ingestion没有状态,DAG 标记为Failed

我还深入研究了任务实例的详细信息,发现

  • Dagrun Running:任务实例的 dagrun 不是处于“运行”状态,而是处于“失败”状态。
  • 触发规则:任务的触发规则 'all_success' 要求所有上游任务都已成功,但发现 1 个不成功。upstream_tasks_state = {'失败':0,'upstream_failed':0,'跳过':0,'完成':49,'成功':49},upstream_task_ids = ['s3_finish_upload_ingestion_raichucrud_complain','s3_finish_upload_ingestion_raichucrud_interaction','s3_huiccrud_complaining_up s3_finish_upload_ingestion_raichucrud_user', 's3_finish_upload_ingestion_raichucrud_privatecontactinteraction', 's3_finish_upload_ingestion_raichucrud_location', 's3_finish_upload_ingestion_raichucrud_companytoken', 's3_finish_upload_ingestion_raichucrud_indexevolution', 's3_finish_upload_ingestion_raichucrud_companyindex', '

如您所见,“触发规则”字段表示其中一项任务处于“不成功状态”,但同时统计数据显示所有上游都标记为成功。

如果我重置数据库,它不会发生,但我不能为每次执行(每小时)重置它。我也不想重置它。

有人有灯吗?

PS:我正在使用 LocalExecutor 在 EC2 实例(c4.xlarge)中运行

[编辑] 我在调度程序日志中发现 DAG 处于死锁状态:

[2017-08-25 19:25:25,821] {models.py:4076} DagFileProcessor157 信息 - 死锁;标记运行失败

我想这可能是由于一些异常处理。

4

1 回答 1

3

我以前遇到过这个确切的问题,对我来说,我的代码正在生成重复的任务 ID。在您的情况下,看起来还有一个重复的 id: s3_finish_upload_ingestion_raichucrud_privatecontactinteraction

这对你来说可能晚了一年,但希望这会节省其他人,大量的调试时间:)

于 2018-05-15T10:59:44.787 回答