基于 Javadocs 和https://beam.apache.org/blog/2017/02/13/stateful-processing.html上的博客文章,我尝试使用使用 2.0.0-beta-2 SDK 的简单重复数据删除示例它从 GCS 读取一个文件(包含一个 json 列表,每个都有一个 user_id 字段),然后通过管道运行它,如下所述。
输入数据包含大约 146K 事件,其中只有 50 个事件是唯一的。整个输入大约是 50MB,应该可以在比 2 分钟固定窗口短得多的时间内处理。我只是在那里放置了一个窗口,以确保在不使用 GlobalWindow 的情况下保持 per-key-per-window 语义。我通过 3 个并行阶段运行窗口数据以比较结果,每个阶段都在下面解释。
- 只需将内容复制到 GCS 上的新文件中 - 这可确保所有事件都按预期处理,并且我验证内容与输入完全相同
- 在 user_id 上组合.PerKey 并仅从 Iterable 中选择第一个元素 - 这基本上应该对数据进行重复数据删除,并且它可以按预期工作。生成的文件具有原始事件列表中唯一项目的确切数量 - 50 个元素
- 有状态 ParDo 检查密钥是否已经被看到并仅在没有时才发出输出。理想情况下,由此产生的结果应该与 [2] 中的重复数据匹配,但我所看到的只是 3 个唯一事件。在我做的几次运行中,这 3 个唯一事件总是指向相同的 3 个 user_id。
有趣的是,当我从 DataflowRunner 切换到在本地运行整个过程的 DirectRunner 时,我看到 [3] 的输出与 [2] 匹配,只有 50 个唯一元素,如预期的那样。因此,我怀疑 Stateful ParDo 的 DataflowRunner 是否存在任何问题。
public class StatefulParDoSample {
private static Logger logger = LoggerFactory.getLogger(StatefulParDoSample.class.getName());
static class StatefulDoFn extends DoFn<KV<String, String>, String> {
final Aggregator<Long, Long> processedElements = createAggregator("processed", Sum.ofLongs());
final Aggregator<Long, Long> skippedElements = createAggregator("skipped", Sum.ofLongs());
@StateId("keyTracker")
private final StateSpec<Object, ValueState<Integer>> keyTrackerSpec =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void processElement(
ProcessContext context,
@StateId("keyTracker") ValueState<Integer> keyTracker) {
processedElements.addValue(1l);
final String userId = context.element().getKey();
int wasSeen = firstNonNull(keyTracker.read(), 0);
if (wasSeen == 0) {
keyTracker.write( 1);
context.output(context.element().getValue());
} else {
keyTracker.write(wasSeen + 1);
skippedElements.addValue(1l);
}
}
}
public static void main(String[] args) {
DataflowPipelineOptions pipelineOptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
pipelineOptions.setRunner(DataflowRunner.class);
pipelineOptions.setProject("project-name");
pipelineOptions.setStagingLocation(GCS_STAGING_LOCATION);
pipelineOptions.setStreaming(false);
pipelineOptions.setAppName("deduper");
Pipeline p = Pipeline.create(pipelineOptions);
final ObjectMapper mapper = new ObjectMapper();
PCollection<KV<String, String>> keyedEvents =
p
.apply(TextIO.Read.from(GCS_SAMPLE_INPUT_FILE_PATH))
.apply(WithKeys.of(new SerializableFunction<String, String>() {
@Override
public String apply(String input) {
try {
Map<String, Object> eventJson =
mapper.readValue(input, Map.class);
return (String) eventJson.get("user_id");
} catch (Exception e) {
}
return "";
}
}))
.apply(
Window.into(
FixedWindows.of(Duration.standardMinutes(2))
)
);
keyedEvents
.apply(ParDo.of(new StatefulDoFn()))
.apply(TextIO.Write.to(GCS_SAMPLE_OUTPUT_FILE_PATH).withNumShards(1));
keyedEvents
.apply(Values.create())
.apply(TextIO.Write.to(GCS_SAMPLE_COPY_FILE_PATH).withNumShards(1));
keyedEvents
.apply(Combine.perKey(new SerializableFunction<Iterable<String>, String>() {
@Override
public String apply(Iterable<String> input) {
return !input.iterator().hasNext() ? "empty" : input.iterator().next();
}
}))
.apply(Values.create())
.apply(TextIO.Write.to(GCS_SAMPLE_COMBINE_FILE_PATH).withNumShards(1));
PipelineResult result = p.run();
result.waitUntilFinish();
}
}