我对 Apache Beam 非常陌生,而且我的 Java 技能相当低,但我想了解为什么我的简单条目操作在 Apache Beam 上运行如此缓慢。
我正在尝试执行以下操作:我有一个 CSV 文件,其中包含以下方案的 100 万条记录(Alexa 排名前 100 万的站点):(NUMBER,DOMAIN
例如1,google.com
),我想“剥离”第一个(数字)字段和仅获取域部分。我的这个管道的代码如下:
package misc.examples;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class Example {
static class ExtractDomainsFn extends DoFn<String, String> {
private final Counter domains = Metrics.counter(ExtractDomainsFn.class, "domains");
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().contains(",")) {
domains.inc();
String domain = c.element().split(",")[1];
c.output(domain);
}
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create();
p.apply("ReadLines", TextIO.read().from("./top-1m.csv"))
.apply("ExtractDomains", ParDo.of(new ExtractDomainsFn()))
.apply("WriteDomains", TextIO.write().to("domains"));
p.run().waitUntilFinish();
}
}
当我用 Maven 执行这段代码时,在我的笔记本电脑上需要超过四分钟才能成功:
$ mvn compile exec:java -Dexec.mainClass=misc.examples.Example
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building my-example 1.0.0
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ my-example ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /…/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ my-example ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ my-example ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 04:36 min
[INFO] Finished at: 2017-06-24T15:20:33+03:00
[INFO] Final Memory: 31M/1685M
[INFO] ------------------------------------------------------------------------
cut(1)
虽然在你眨眼之前简单的工作:
$time cut -d, -f2 top-1m.csv > domains
real 0m0.171s
user 0m0.140s
sys 0m0.028s
那么,这种 Apache Beam 行为是否被认为是可以接受的(可能它在处理大量数据时会更好地工作)还是我的代码效率低下?
2014 年 1 月 7 日更新:
正如 Kenn Knowles建议的那样,我尝试在其他运行器上运行管道,而不是DirectRunner
在DataflowRunner
. 所以更新后的代码如下所示:
package misc.examples;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class Example {
static class ExtractDomainsFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().contains(",")) {
String domain = c.element().split(",")[1];
c.output(domain);
}
}
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setRunner(DataflowRunner.class);
dataflowOptions.setProject("my-gcp-project-id");
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from("gs://my-gcs-bucket/top-1m.csv"))
.apply("ExtractDomains", ParDo.of(new ExtractDomainsFn()))
.apply("WriteDomains", TextIO.write().to("gs://my-gcs-bucket/output/"));
p.run().waitUntilFinish();
}
}
与直接运行器相比,在 Google Dataflow 上运行的运行时间更短,但仍然足够慢 - 多于3 分钟: