我有一个基于 apache-beam 的数据流作业,可以使用vcf 源从单个文本文件(存储在谷歌云存储中)读取,将文本行转换为数据存储Entities
并将它们写入数据存储接收器。工作流程运行良好,但我注意到的缺点是:
- 写入数据存储的速度最多约为每秒 25-30 个实体。
- 我尝试使用
--autoscalingAlgorithm=THROUGHPUT_BASED --numWorkers=10 --maxNumWorkers=100
,但执行似乎更喜欢一名工作人员(见下图:目标工作人员曾经增加到 2,但“基于在当前运行步骤中并行化工作的能力”减少到 1)。
我没有使用祖先路径作为键;所有的实体都是一样的kind
。
管道代码如下所示:
def write_to_datastore(project, user_options, pipeline_options):
"""Creates a pipeline that writes entities to Cloud Datastore."""
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'Read vcf files' >> vcfio.ReadFromVcf(user_options.input)
| 'Create my entity' >> beam.ParDo(
ToEntityFn(), user_options.kind)
| 'Write to datastore' >> WriteToDatastore(project))
因为我有数百万行要写入数据存储区,所以以 30 个实体/秒的速度写入会花费很长时间。
问题:输入只是一个巨大的 gzip 文件。我是否需要将其拆分为多个小文件以触发多个工作人员?有没有其他方法可以加快导入速度?我错过了num_workers
设置中的某些内容吗?谢谢!