0

我们创建了一个管道,它正在对位于 GCS 中的 3 个流(“Clicks”、“Impressions”、“ActiveViews”)执行转换。我们需要将各个流写回 GCS,但要分开文件(稍后加载到 BigQuery),因为它们都有稍微不同的架构。

其中一个写入连续两次失败,每次都有不同的错误,这反过来导致管道失败。

这些是 GDC 以可视方式表示的最后 2 个工作流/管道,它们显示了失败:

写入失败

写入失败

第一个错误:

Feb 21, 2015, 12:55:14 PM (b0cbc05dfc56dbd9): Workflow failed. Causes: (f98c177c56055863): Map task completion for Step "ActiveViews-GSC-write" failed. Causes: (2d838e694976dc6): Expansion failed for filepattern: gs://cdf/binaries/tmp-38156614004ed90e-[0-9][0-9][0-9][0-9][0-9]-of-[0-9][0-9][0-9][0-9][0-9].avro.

第二个错误:

Feb 21, 2015, 1:20:15 PM (19dcdcf1fe125eeb): Workflow failed. Causes: (2a27345ef73673d3): Map task completion for Step "ActiveViews-GSC-write" failed. Causes: (8f79a20dfa5c4d2b): Unable to view metadata for file: gs://cdf/binaries/tmp-2a27345ef7367fe6-00001-of-00015.avro.

它只发生在“ActiveViews-GCS-Write”步骤中。

知道我们做错了什么吗?

4

2 回答 2

1

我们找到了解决方法。问题似乎是当指定多个输入源并使用扁平化来合并它们时。

对 2 个输入源(例如 2 月 1 日至 2 日的所有文件)使用 flatten 不起作用(或者我们做错了):

PCollection<String> pc1 = pipeline.apply(TextIO.Read.from("gs://<bucket_name>/NetworkImpressions_20150201*"); //1st Feb
PCollection<String> pc2 = pipeline.apply(TextIO.Read.from("gs://<bucket_name>/NetworkImpressions_20150202*"); //2nd Feb
PCollectionList<String> all = PCollectionList.of(pc1).and(pc2);
PCollection<String> flattened = all.apply(Flatten.<String>pCollections());

相反,我们只使用 GLOB(没有展平),它每次都有效:

pipeline.apply(TextIO.Read.from("gs://<bucket_name>/Files_2015020[12]*");
于 2015-02-24T00:27:06.663 回答
1

原始代码很可能遇到两个不同的问题,其中一个已经修复。这两个问题分别与

  1. 通过将集合拼合在一起来组合集合。
  2. 我们如何处理全局模式。

带有扁平化的问题编号 1 是已修复的问题。解决了该问题后,您很可能会遇到第二个问题,即如何处理 glob 模式。

如果您使用 flatten 但与您在非 flatten 情况下使用的 glob 类似,会发生什么情况,例如

PCollection<String> pc1 = pipeline.apply(TextIO.Read.from("gs://<bucket_name>/NetworkImpressions_2015020[1]*");
PCollection<String> pc2 = pipeline.apply(TextIO.Read.from("gs://<bucket_name>/NetworkImpressions_2015020[2]*")

在 GCS 中匹配 glob 有点棘手,因为 GCS 列表操作最终是一致的。

于 2015-03-26T18:55:14.257 回答