Im trying to use Beam to aggregate over a set of data using event time from the data and Kafka as data source. This works if all my kafka partitions are populated with data. However as soon as a partition has not yet been written to, the watermark cant be estimated and advanced.My TimeStampPolicy is the following:
public class CustomTimeStampPolicy
extends TimestampPolicy<String, titan.ccp.model.records.ActivePowerRecord> {
protected Instant currentWatermark;
public CustomTimeStampPolicy(final Optional<Instant> previousWatermark) {
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@Override
public Instant getTimestampForRecord(final PartitionContext ctx,
final KafkaRecord<String, titan.ccp.model.records.ActivePowerRecord> record) {
this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
return this.currentWatermark;
}
@Override
public Instant getWatermark(final PartitionContext ctx) {
System.out.println("Current Watermark: " + this.currentWatermark);
return this.currentWatermark;
}
}
With 3 Kafka partitions with only one populated with data my logs show me these watermarks:
Current Watermark: -290308-12-21T19:59:05.225Z
Current Watermark: 2020-12-09T10:42:29.909Z
Current Watermark: -290308-12-21T19:59:05.225Z
With default triggering my windows wont fire. My guess is that the output watermark is the minimum over the watermarks of the partitions. And therefore wont advance as long as some of my partitions are empty. How can i handle empty partitions with event time processing?