我有一个流式作业,初始运行必须处理大量数据。DoFn 调用支持批处理请求的远程服务之一,因此在使用有界集合时,我使用以下方法:
private static final class Function extends DoFn<String, Void> implements Serializable {
private static final long serialVersionUID = 2417984990958377700L;
private static final int LIMIT = 500;
private transient Queue<String> buffered;
@StartBundle
public void startBundle(Context context) throws Exception {
buffered = new LinkedList<>();
}
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
buffered.add(context.element());
if (buffered.size() > LIMIT) {
flush();
}
}
@FinishBundle
public void finishBundle(Context c) throws Exception {
// process remaining
flush();
}
private void flush() {
// build batch request
while (!buffered.isEmpty()) {
buffered.poll();
// do something
}
}
}
有没有办法窗口数据,以便可以在无界集合上使用相同的方法?
我试过以下:
pipeline
.apply("Read", Read.from(source))
.apply(WithTimestamps.of(input -> Instant.now()))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2L))))
.apply("Process", ParDo.of(new Function()));
但是每个元素都调用startBundle
and 。finishBundle
是否有机会使用 RxJava(2 分钟窗口或 100 个元素包):
source
.toFlowable(BackpressureStrategy.LATEST)
.buffer(2, TimeUnit.MINUTES, 100)