我正在尝试计算输入文件中的行数,并且我正在使用 Cloud dataflow Runner 来创建模板。在下面的代码中,我从 GCS 存储桶中读取文件,对其进行处理,然后将输出存储在 Redis 实例中。
但我无法计算输入文件的行数。
主班
public static void main(String[] args) {
/**
* Constructed StorageToRedisOptions object using the method PipelineOptionsFactory.fromArgs to read options from command-line
*/
StorageToRedisOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StorageToRedisOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("Reading Lines...", TextIO.read().from(options.getInputFile()))
.apply("Transforming data...",
ParDo.of(new DoFn<String, String[]>() {
@ProcessElement
public void TransformData(@Element String line, OutputReceiver<String[]> out) {
String[] fields = line.split("\\|");
out.output(fields);
}
}))
.apply("Processing data...",
ParDo.of(new DoFn<String[], KV<String, String>>() {
@ProcessElement
public void ProcessData(@Element String[] fields, OutputReceiver<KV<String, String>> out) {
if (fields[RedisIndex.GUID.getValue()] != null) {
out.output(KV.of("firstname:"
.concat(fields[RedisIndex.FIRSTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));
out.output(KV.of("lastname:"
.concat(fields[RedisIndex.LASTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));
out.output(KV.of("dob:"
.concat(fields[RedisIndex.DOB.getValue()]), fields[RedisIndex.GUID.getValue()]));
out.output(KV.of("postalcode:"
.concat(fields[RedisIndex.POSTAL_CODE.getValue()]), fields[RedisIndex.GUID.getValue()]));
}
}
}))
.apply("Writing field indexes into redis",
RedisIO.write().withMethod(RedisIO.Write.Method.SADD)
.withEndpoint(options.getRedisHost(), options.getRedisPort()));
p.run();
}
示例输入文件
xxxxxxxxxxxxxxxx|bruce|wayne|31051989|444444444444
yyyyyyyyyyyyyyyy|selina|thomas|01051989|222222222222
aaaaaaaaaaaaaaaa|clark|kent|31051990|666666666666
执行管道的命令
mvn compile exec:java \
-Dexec.mainClass=com.viveknaskar.DataFlowPipelineForMemStore \
-Dexec.args="--project=my-project-id \
--jobName=dataflow-job \
--inputFile=gs://my-input-bucket/*.txt \
--redisHost=127.0.0.1 \
--stagingLocation=gs://pipeline-bucket/stage/ \
--dataflowJobFile=gs://pipeline-bucket/templates/dataflow-template \
--runner=DataflowRunner"
我尝试使用StackOverflow 解决方案中的以下代码,但它对我不起作用。
PipelineOptions options = ...;
DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options);
Pipeline p = Pipeline.create(options);
PCollection<Long> countPC =
p.apply(TextIO.Read.from("gs://..."))
.apply(Count.<String>globally());
DirectPipelineRunner.EvaluationResults results = runner.run(p);
long count = results.getPCollection(countPC).get(0);
我也浏览了 Apache Beam 文档,但没有发现任何帮助。对此的任何帮助将不胜感激。