2

我对 Apache Beam 非常陌生,而且我的 Java 技能相当低,但我想了解为什么我的简单条目操作在 Apache Beam 上运行如此缓慢。

我正在尝试执行以下操作:我有一个 CSV 文件,其中包含以下方案的 100 万条记录(Alexa 排名前 100 万的站点):(NUMBER,DOMAIN例如1,google.com),我想“剥离”第一个(数字)字段和仅获取域部分。我的这个管道的代码如下:

package misc.examples;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class Example {

  static class ExtractDomainsFn extends DoFn<String, String> {
    private final Counter domains = Metrics.counter(ExtractDomainsFn.class, "domains");

    @ProcessElement
    public void processElement(ProcessContext c) {
      if (c.element().contains(",")) {
        domains.inc();

        String domain = c.element().split(",")[1];
        c.output(domain);
      }
    }
  }

  public static void main(String[] args) {
    Pipeline p = Pipeline.create();

    p.apply("ReadLines", TextIO.read().from("./top-1m.csv"))
     .apply("ExtractDomains", ParDo.of(new ExtractDomainsFn()))
     .apply("WriteDomains", TextIO.write().to("domains"));

    p.run().waitUntilFinish();
  }
}

当我用 Maven 执行这段代码时,在我的笔记本电脑上需要超过四分钟才能成功:

$ mvn compile exec:java -Dexec.mainClass=misc.examples.Example
[INFO] Scanning for projects...
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building my-example 1.0.0
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ my-example ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /…/src/main/resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ my-example ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ my-example ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 04:36 min
[INFO] Finished at: 2017-06-24T15:20:33+03:00
[INFO] Final Memory: 31M/1685M
[INFO] ------------------------------------------------------------------------

cut(1)虽然在你眨眼之前简单的工作:

$time cut -d, -f2 top-1m.csv > domains

real    0m0.171s
user    0m0.140s
sys     0m0.028s

那么,这种 Apache Beam 行为是否被认为是可以接受的(可能它在处理大量数据时会更好地工作)还是我的代码效率低下?

2014 年 1 月 7 日更新:

正如 Kenn Knowles建议的那样,我尝试在其他运行器上运行管道,而不是DirectRunnerDataflowRunner. 所以更新后的代码如下所示:

package misc.examples;

import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class Example {

  static class ExtractDomainsFn extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      if (c.element().contains(",")) {
        String domain = c.element().split(",")[1];
        c.output(domain);
      }
    }
  }

  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
    dataflowOptions.setRunner(DataflowRunner.class);
    dataflowOptions.setProject("my-gcp-project-id");
    Pipeline p = Pipeline.create(options);
    p.apply("ReadLines", TextIO.read().from("gs://my-gcs-bucket/top-1m.csv"))
     .apply("ExtractDomains", ParDo.of(new ExtractDomainsFn()))
     .apply("WriteDomains", TextIO.write().to("gs://my-gcs-bucket/output/"));

    p.run().waitUntilFinish();
  }
}

与直接运行器相比,在 Google Dataflow 上运行的运行时间更短,但仍然足够慢 - 多于3 分钟

谷歌数据流作业

Google 数据流作业日志

4

1 回答 1

4

Apache Beam 在 Apache Flink、Apache Spark、Apache Apex 和 Google Cloud Dataflow 等大规模数据处理引擎上提供正确的事件时间处理和可移植性。

在这里,您似乎正在默认运行您的管道,DirectRunner这是一种小规模测试管道正确性的方法(其中“小”表示不使用多台机器的任何东西)。为了测试正确性,运行程序还执行额外的任务以帮助确保正确性,例如检查您的序列化 ( Coder) 并将元素随机排列以确保您的管道不依赖于顺序。

DirectRunner不一定一次将所有值带入内存,但具有流式执行模型,因此它也适用于无界数据集和触发。与简单循环相比,这也增加了开销。

也就是说,四分钟非常慢,我提交了BEAM-2516进行跟进。

您也可以尝试在其他后端运行它,特别是在您的笔记本电脑上运行SparkRunner,FlinkRunnerApexRunner支持嵌入式执行。

回复 2017-07-01 更新:

尽管您在 Cloud Dataflow 上体验的总运行时间约为 3 分钟,但处理数据的实际时间约为 1 分钟。您可以在日志中看到这一点。剩下的就是启动和关闭工作虚拟机。我们一直在努力减少这种开销。为什么大约需要 1 分钟?您必须进行剖析才能找到答案(我很想听听结果!)但当然 Dataflow 所做的不仅仅是cut:从 GCS 读取和写入,提供持久性和容错性,并且在TextIO写入步骤中它是执行数据的网络洗牌,以便并行写入分片文件。如果 Dataflow 注意到您的计算没有并行性并且足够小以至于不需要它,那么显然可以优化一些东西。

但请记住,Beam 和 Cloud Dataflow 的存在可以帮助您对无法在单台机器上及时处理的大量数据进行并行处理。因此,处理没有可用并行性的微小示例不是目标。

较小的顺序计算经常作为大型管道的一小部分发生,但在实际物理计划的背景下,小型辅助计算通常不会影响端到端时间。VM 管理的开销也是一次性成本,因此更有可能根据数十到数百台机器上的几分钟到几小时的计算来衡量它们。

于 2017-06-26T17:49:15.177 回答