3

我有一个基于 apache-beam 的数据流作业,可以使用vcf 源从单个文本文件(存储在谷歌云存储中)读取,将文本行转换为数据存储Entities并将它们写入数据存储接收器。工作流程运行良好,但我注意到的缺点是:

  • 写入数据存储的速度最多约为每秒 25-30 个实体。
  • 我尝试使用--autoscalingAlgorithm=THROUGHPUT_BASED --numWorkers=10 --maxNumWorkers=100,但执行似乎更喜欢一名工作人员(见下图:目标工作人员曾经增加到 2,但“基于在当前运行步骤中并行化工作的能力”减少到 1)。

我没有使用祖先路径作为键;所有的实体都是一样的kind

管道代码如下所示:

def write_to_datastore(project, user_options, pipeline_options):
"""Creates a pipeline that writes entities to Cloud Datastore."""
  with beam.Pipeline(options=pipeline_options) as p:
  (p
   | 'Read vcf files' >> vcfio.ReadFromVcf(user_options.input)
   | 'Create my entity' >> beam.ParDo(
     ToEntityFn(), user_options.kind)
   | 'Write to datastore' >> WriteToDatastore(project))

因为我有数百万行要写入数据存储区,所以以 30 个实体/秒的速度写入会花费很长时间。

问题:输入只是一个巨大的 gzip 文件。我是否需要将其拆分为多个小文件以触发多个工作人员?有没有其他方法可以加快导入速度?我错过了num_workers设置中的某些内容吗?谢谢!

4

2 回答 2

2

我对apache beam不熟悉,答案是从一般流程的角度来看的。

假设在各个输入文件部分中的实体数据之间没有要考虑的依赖关系,那么是的,使用多个输入文件肯定会有所帮助,因为所有这些文件都可以虚拟并行处理(当然,取决于可用的最大数量工人)。

可能不需要事先拆分巨大的 zipfile,如果与实际数据段处理相比,这种切换本身的开销可以忽略不计,则可以简单地将单个输入数据流的段移交给单独的数据段工作人员进行写入.

整体性能限制将是读取输入数据的速度,将其分割成段并移交给段数据工作者。

A data segment worker would further split the data segment it receives in smaller chunks of up to the equivalent of the max 500 entities that can be converted to entities and written to the datastore in a single batch operation. Depending of the datastore client library used it may be possible to perform this operation asyncronously, allowing the split into chunks and conversion to entities to continue without waiting for the previous datastore writes to complete.

The performance limitation at the data segment worker would then be the speed at which the data segment can be split into chunks and the chunk converted to entities

If async ops aren't available or for even higher throughput, yet another handoff of each chunk to a segment worker could be performed, with the segment worker performing the conversion to entities and datastore batch write.

The performance limitation at the data segment worker level would then be just the speed at which the data segment can be split into chunks and handed over to the chunk workers.

With such approach the actual conversion to entities and batch writing them to the datastore (async or not) would no longer sit in the critical path of splitting the input data stream, which is, I believe, the performance limitation in your current approach.

于 2018-05-07T13:06:13.263 回答
2

I looked into the design of vcfio. I suspect (if I understand correctly) that the reason I always get one worker when the input is a single file is due to the limit of the _VcfSource and the VCF format constraint. This format has a header part that defines how to translate the non-header lines. This causes that each worker that reads the source file has to work on an entire file. When I split the single file into 5 separate files that share the same header, I successfully get up to 5 workers (but not any more probably due to the same reason).

One thing I don't understand is that the number of workers that read can be limited to 5 (in this case). But why we are limited to have only 5 workers to write? Anyway, I think I have found the alternative way to trigger multiple workers with beam Dataflow-Runner (use pre-split VCF files). There is also a related approach in gcp variant transforms project, in which the vcfio has been significantly extended. It seems to support the multiple workers with a single input vcf file. I wish the changes in that project could be merged into the beam project too.

于 2018-05-08T00:42:13.443 回答