我希望将来自 KStream 的窗口批次输出组合在一起并将它们写入辅助存储。
我期待看到.punctuate()
大约每 30 秒被调用一次。我得到的反而被保存在这里。
(原始文件长达数千行)
摘要 -.punctuate()
看似随机然后反复调用。它似乎不遵守通过ProcessorContext.schedule()设置的值。
编辑:
.punctuate()
相同代码的另一次运行大约每四分钟产生一次调用。这次我没有看到疯狂的重复值。来源没有变化 - 只是结果不同。
使用以下代码:
主要的
StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
lines.process(new BPS2());
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
处理器
public class BP2 extends AbstractProcessor<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);
private ProcessorContext context;
private final long delay;
private final ArrayList<String> values;
public BP2(long delay) {
LOGGER.debug("BatchProcessor() constructor");
this.delay = delay;
values = new ArrayList<>();
}
@Override
public void process(String s, String s2) {
LOGGER.debug("batched processor s:{} s2:{}", s, s2);
values.add(s2);
}
@Override
public void init(ProcessorContext context) {
LOGGER.info("init");
super.init(context);
values.clear();
this.context = context;
context.schedule(delay);
}
@Override
public void punctuate(long timestamp) {
super.punctuate(timestamp);
LOGGER.info("punctuate ts: {} count: {}", timestamp, values.size());
context().commit();
}
}
处理器供应商
public class BPS2 implements ProcessorSupplier<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);
@Override
public Processor<String, String> get() {
try {
return new BP2(30000);
} catch(Exception exception) {
LOGGER.error("Unable to instantiate BatchProcessor()", exception);
throw new RuntimeException();
}
}
}
编辑:
为了确保我的调试器不会减慢它的速度,我构建了它并在与我的 kafka 进程相同的盒子上运行它。这一次,它甚至没有尝试延迟 4 分钟或更长时间 - 在几秒钟内,它就向.punctuate()
. 其中许多(大多数)没有对.process()
.