0

基于 Javadocs 和https://beam.apache.org/blog/2017/02/13/stateful-processing.html上的博客文章,我尝试使用使用 2.0.0-beta-2 SDK 的简单重复数据删除示例它从 GCS 读取一个文件(包含一个 json 列表,每个都有一个 user_id 字段),然后通过管道运行它,如下所述。

输入数据包含大约 146K 事件,其中只有 50 个事件是唯一的。整个输入大约是 50MB,应该可以在比 2 分钟固定窗口短得多的时间内处理。我只是在那里放置了一个窗口,以确保在不使用 GlobalWindow 的情况下保持 per-key-per-window 语义。我通过 3 个并行阶段运行窗口数据以比较结果,每个阶段都在下面解释。

  1. 只需将内容复制到 GCS 上的新文件中 - 这可确保所有事件都按预期处理,并且我验证内容与输入完全相同
  2. 在 user_id 上组合.PerKey 并仅从 Iterable 中选择第一个元素 - 这基本上应该对数据进行重复数据删除,并且它可以按预期工作。生成的文件具有原始事件列表中唯一项目的确切数量 - 50 个元素
  3. 有状态 ParDo 检查密钥是否已经被看到并仅在没有时才发出输出。理想情况下,由此产生的结果应该与 [2] 中的重复数据匹配,但我所看到的只是 3 个唯一事件。在我做的几次运行中,这 3 个唯一事件总是指向相同的 3 个 user_id。

有趣的是,当我从 DataflowRunner 切换到在本地运行整个过程的 DirectRunner 时,我看到 [3] 的输出与 [2] 匹配,只有 50 个唯一元素,如预期的那样。因此,我怀疑 Stateful ParDo 的 DataflowRunner 是否存在任何问题。

public class StatefulParDoSample {
    private static Logger logger = LoggerFactory.getLogger(StatefulParDoSample.class.getName());

    static class StatefulDoFn extends DoFn<KV<String, String>, String> {
        final Aggregator<Long, Long> processedElements = createAggregator("processed", Sum.ofLongs());
        final Aggregator<Long, Long> skippedElements = createAggregator("skipped", Sum.ofLongs());

        @StateId("keyTracker")
        private final StateSpec<Object, ValueState<Integer>> keyTrackerSpec =
                StateSpecs.value(VarIntCoder.of());

        @ProcessElement
        public void processElement(
                ProcessContext context,
                @StateId("keyTracker") ValueState<Integer> keyTracker) {
            processedElements.addValue(1l);
            final String userId = context.element().getKey();

            int wasSeen = firstNonNull(keyTracker.read(), 0);
            if (wasSeen == 0) {
                keyTracker.write( 1);
                context.output(context.element().getValue());
            } else {
                keyTracker.write(wasSeen + 1);
                skippedElements.addValue(1l);
            }
        }
    }

    public static void main(String[] args) {
        DataflowPipelineOptions pipelineOptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
        pipelineOptions.setRunner(DataflowRunner.class);
        pipelineOptions.setProject("project-name");
        pipelineOptions.setStagingLocation(GCS_STAGING_LOCATION);
        pipelineOptions.setStreaming(false);
        pipelineOptions.setAppName("deduper");
        Pipeline p = Pipeline.create(pipelineOptions);

        final ObjectMapper mapper = new ObjectMapper();
        PCollection<KV<String, String>> keyedEvents =
        p
            .apply(TextIO.Read.from(GCS_SAMPLE_INPUT_FILE_PATH))
            .apply(WithKeys.of(new SerializableFunction<String, String>() {
                @Override
                public String apply(String input) {
                    try {
                        Map<String, Object> eventJson =
                                mapper.readValue(input, Map.class);
                        return (String) eventJson.get("user_id");
                    } catch (Exception e) {

                    }

                    return "";
                }
            }))
            .apply(
                Window.into(
                    FixedWindows.of(Duration.standardMinutes(2))
                )
            );

        keyedEvents
            .apply(ParDo.of(new StatefulDoFn()))
            .apply(TextIO.Write.to(GCS_SAMPLE_OUTPUT_FILE_PATH).withNumShards(1));

        keyedEvents
            .apply(Values.create())
            .apply(TextIO.Write.to(GCS_SAMPLE_COPY_FILE_PATH).withNumShards(1));

        keyedEvents
            .apply(Combine.perKey(new SerializableFunction<Iterable<String>, String>() {
                @Override
                public String apply(Iterable<String> input) {
                    return !input.iterator().hasNext() ? "empty" : input.iterator().next();
                }
            }))
            .apply(Values.create())
            .apply(TextIO.Write.to(GCS_SAMPLE_COMBINE_FILE_PATH).withNumShards(1));

        PipelineResult result = p.run();
        result.waitUntilFinish();
    }
}
4

1 回答 1

2

这是批处理模式下 Dataflow 服务中的一个错误,已在即将发布的 0.6.0 Beam 版本(或 HEAD,如果您跟踪前沿)中修复。

感谢您引起我的注意!作为参考,或者如果出现其他任何问题,这由BEAM-1611跟踪。

于 2017-03-02T22:08:43.133 回答