我已经实现了一个具有多个无界源和侧输入的数据管道,将数据与滑动窗口(30 秒和每 10 秒)连接起来,并将转换后的输出发送到 Kafka 主题中。我遇到的问题是,在窗口的前 10 秒内收到的数据会发出 3 次(即)每当新窗口启动时触发,直到第一个窗口完成。如何只发出一次转换后的数据或避免重复?
我使用了丢弃触发的窗格,它没有任何区别。每当我尝试将窗口关闭行为设置为 FIRE_ALWAYS/FIRE_IF_NON_EMPTY 时,它都会引发以下错误。
线程“主”org.apache.beam.sdk.Pipeline$PipelineExecutionException 中的异常:java.lang.IllegalArgumentException:作为单例视图访问的空 PCollection。考虑设置 withDefault 以在 org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner. java:302) 在 org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197) 在 org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64) 在 org.apache .beam.sdk.Pipeline.run(Pipeline.java:313) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) at y.yyy.main(yyy.java:86) 原因: java.lang.IllegalArgumentException:作为单例视图访问的空 PCollection。
data.apply("Transform", ParDo.of(
new DoFn<String, Row>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(
ProcessContext processContext,
final OutputReceiver<Row> emitter) {
String record = processContext.element();
final String[] parts = record.split(",");
emitter.output(Row.withSchema(sch).addValues(parts).build());
}
})).apply(
"window1",
Window
.<Row>into(
SlidingWindows
.of(Duration.standardSeconds(30))
.every(Duration.standardSeconds(10)))
.withAllowedLateness(
Duration.ZERO,
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.discardingFiredPanes());
请指导我只触发一次窗口(即)我不想发送已经处理的记录
更新:侧面输入的上述错误经常发生并且不是因为 Windows,这似乎是 Apache Beam 中的一个问题(https://issues.apache.org/jira/browse/BEAM-6086)
我尝试使用 State 来识别一行是否已经被处理,但状态没有被保留或被设置。(即)我在阅读状态时总是得到空值。
public class CheckState extends DoFn<KV<String,String>,KV<Integer,String>> {
private static final long serialVersionUID = 1L;
@StateId("count")
private final StateSpec<ValueState<String>> countState =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
public void processElement(
ProcessContext processContext,
@StateId("count") ValueState<String> countState) {
KV<String,String> record = processContext.element();
String row = record.getValue();
System.out.println("State: " + countState.read());
System.out.println("Setting state as "+ record.getKey() + " for value"+ row.split(",")[0]);
processContext.output(KV.of(current, row));
countState.write(record.getKey());
}