0

目前我们有一个数据流作业,它从 pubsub 读取并使用 FileIO.writeDynamic 将 avro 文件写入 GCS,当我们使用 10000 events/sec 进行测试时,由于 WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards 非常慢,因此无法更快地处理。下面是我们用来编写的代码片段。我们如何改进

PCollection<Event> windowedWrites = input.apply("Global Window", Window.<Event>into(new GlobalWindows())
        .triggering(Repeatedly.forever(
            AfterFirst.of(AfterPane.elementCountAtLeast(50000),
                AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DurationUtils
                    .parseDuration(windowDuration))))).discardingFiredPanes());

        return windowedWrites
                        .apply("WriteToAvroGCS", FileIO.<EventDestination, Five9Event>writeDynamic()
                                        .by(groupFn)
                                        .via(outputFn, Contextful.fn(
                                                        new SinkFn()))
                                        .withTempDirectory(avroTempDirectory)
                                        .withDestinationCoder(destinationCoder)
                                        .withNumShards(1).withNaming(namingFn));

我们使用自定义文件命名,格式为 gs://tenantID.<>/eventname/dddd-mm-dd/<uniq_id-shardInder-of-numOfShards-pane-paneIndex.avro>

4

2 回答 2

0

正如罗伯特所说,当使用withNumShards(1)Dataflow/Beam 时,无法并行化写入,使其发生在同一个工作人员身上。当捆绑包相对较高时,这对管道的性能有很大影响。我举了一个例子来证明这一点:

我运行了 3 个生成大量元素(~2gb)的管道,其中三个有 10 个n1-standard-1工作人员,但有 1 个分片、10 个分片和 0 个分片(Dataflow 会选择分片的数量)。这就是他们的行为方式:

工作

我们看到 0 或 10 个 Shard 与 1 个 Shard 的总时间之间存在很大差异。如果我们使用 1 个分片进行工作,我们会看到只有一个工作人员在做某事(我禁用了自动缩放):

中央处理器

正如 Reza 所提到的,发生这种情况是因为所有元素都需要洗牌到同一个工作人员中,所以它写入 1 分片。

请注意,我的示例是 Batch,它在线程方面的行为与 Streaming 不同,但对管道性能的影响足够相似(实际上,在 Streaming 中它可能更糟糕)。

这里有一个 Python 代码,因此您可以自己测试:

    p = beam.Pipeline(options=pipeline_options)

    def long_string_generator():
        string = "Apache Beam is an open source, unified model for defining " \
                 "both batch and streaming data-parallel processing " \
                 "pipelines. Using one of the open source Beam SDKs, " \
                 "you build a program that defines the pipeline. The pipeline " \
                 "is then executed by one of Beam’s supported distributed " \
                 "processing back-ends, which include Apache Flink, Apache " \
                 "Spark, and Google Cloud Dataflow. "

        word_choice = random.sample(string.split(" "), 20)

        return " ".join(word_choice)

    def generate_elements(element, amount=1):
        return [(element, long_string_generator()) for _ in range(amount)]

    (p | Create(range(1500))
       | beam.FlatMap(generate_elements, amount=10000)
       | WriteToText(known_args.output, num_shards=known_args.shards))

    p.run()
于 2020-09-16T15:14:16.557 回答
0

正如评论中提到的,这个问题很可能withNumShards(1)迫使所有事情都发生在一名工人身上。

于 2020-09-15T15:51:31.430 回答