1

我正在使用默认数据流模板 GCS 来发布/订阅。在云存储中输入文件,大小为 300MB,每个文件有 2-3 百万行。

启动数据流批处理作业时出现以下错误

来自工作人员的错误消息:javax.naming.SizeLimitExceededException:Pub/Sub 消息大小 (1089680070) 超过了最大批处理大小 (7500000) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO$Write$PubsubBoundedWriter.processElement(PubsubIO.爪哇:1160)

来自文档:Pub/Sub 一个批次最多接受 1,000 条消息,并且一个批次的大小不能超过 10 兆字节。

这是否意味着我必须将输入文件拆分为 10MB 块或 1000 条消息才能发布?

将如此大的文件(每个 300MB)加载到 pubsub 的推荐方法是什么?

在此先感谢您的帮助。

4

1 回答 1

1

这是 Dataflow 端的一个已知限制,此时存在增加批量大小的功能请求。使用 +1 按钮并为问题加注星标以跟踪它的进展。

我建议您查看建议解决方法的帖子。重要的是要考虑到此解决方法意味着将Cloud Storage Text 修改为 Pub/Sub模板以实现此处提到的自定义转换。

另一方面,您可以尝试创建云功能来拆分您的文件,然后由 Dataflow 处理,我想是这样的:

  1. 创建一个“暂存”存储桶来上传您的大文件。
  2. 编写一个云函数来拆分您的文件并将小块写入另一个存储桶中。您可以尝试使用filesplit Python 包来执行此操作。
  3. 每次使用Google Cloud Storage Triggers将新文件上传到“暂存”存储桶中时,触发 Cloud Function 运行。
  4. 将文件拆分成小块后,使用相同的 Cloud Function 从“暂存”存储桶中删除大文件以避免额外费用。
  5. 使用 Dataflow 模板 Cloud Storage Text to Pub/Sub 处理第二个存储桶的小块。
于 2020-08-27T00:10:49.183 回答