0

Im trying to use Beam to aggregate over a set of data using event time from the data and Kafka as data source. This works if all my kafka partitions are populated with data. However as soon as a partition has not yet been written to, the watermark cant be estimated and advanced.My TimeStampPolicy is the following:

public class CustomTimeStampPolicy
    extends TimestampPolicy<String, titan.ccp.model.records.ActivePowerRecord> {
  protected Instant currentWatermark;

  public CustomTimeStampPolicy(final Optional<Instant> previousWatermark) {
    this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
  }


  @Override
  public Instant getTimestampForRecord(final PartitionContext ctx,
      final KafkaRecord<String, titan.ccp.model.records.ActivePowerRecord> record) {
    this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
    return this.currentWatermark;
  }

  @Override
  public Instant getWatermark(final PartitionContext ctx) {
    System.out.println("Current Watermark: " + this.currentWatermark);
    return this.currentWatermark;
  }
}

With 3 Kafka partitions with only one populated with data my logs show me these watermarks:

Current Watermark: -290308-12-21T19:59:05.225Z
Current Watermark: 2020-12-09T10:42:29.909Z
Current Watermark: -290308-12-21T19:59:05.225Z

With default triggering my windows wont fire. My guess is that the output watermark is the minimum over the watermarks of the partitions. And therefore wont advance as long as some of my partitions are empty. How can i handle empty partitions with event time processing?

4

1 回答 1

1

如果没有数据写入 Kafka 分区,Beam 无法知道一旦写入一个元素,它就不会在过去有任意时间戳,因此是非常古老的水印。

您可以尝试将时间戳策略构造函数更新为previousWatermark.orElse(wallTime - someMaximumSkew)

someMaximumSkew对于写入 kafka 的数据,您可以期望看到的最大延迟在哪里。您还可以考虑取先前值(如果有)的最小值,并wallTime - someMaximumSkew在一段时间没有写入数据时前进。

于 2020-12-09T18:18:45.653 回答