我需要从多个 Kafka 主题中读取按时间计算的起始偏移量,按时间戳对它们进行排序并发送到另一个 kafka 主题。所有 kafka 主题都有 1 个分区。
再举一个例子来更好地描述用例。
我们有inputTopic1, inputTopic2
和outputTopic
。我们需要使用inputTopics
过去 12 小时的数据并继续使用实时数据。所有消耗的数据都需要排序并发布到outputTopic
.
我试图创建自定义windowFn
来处理这个用例,但是我在output
主题中得到了无序的数据。
我有 2 个问题。
我是否选择了正确的方法来解决这个问题?我是否以正确的方式实施它?
管道
Instant nowAsInstant = Instant.now();
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read from Topics", KafkaIO.<String, String>read()
.withTopics(List.of("topic1", "topic2"))
.withBootstrapServers("localhost:9092")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withCreateTime(Duration.ZERO)
.withStartReadTime(nowAsInstant.minus(Duration.standardHours(12)))
.withConsumerConfigUpdates(consumerConfig)
.commitOffsetsInFinalize())
.apply(Window.into(new CustomWindowFn(nowAsInstant, Duration.millis(500))))
.apply(Combine.globally(new ListCombiner()).withoutDefaults())
.apply("Sort", MapElements.via(
new SimpleFunction<Iterable<KafkaRecord<String, String>>, List<KafkaRecord<String, String>>>() {
@Override
public List<KafkaRecord<String, String>> apply(Iterable<KafkaRecord<String, String>> input) {
return StreamSupport.stream(input.spliterator(), false)
.sorted(KAFKA_RECORD_COMPARATOR)
.collect(Collectors.toUnmodifiableList());
}
}
))
.apply(Flatten.iterables())
.apply("mapToProducerRecord", MapElements.<ProducerRecord<String, String>>into(new ProducerRecordCoder<>(
StringUtf8Coder.of(), StringUtf8Coder.of()).getEncodedTypeDescriptor())
.via((SerializableFunction<KafkaRecord<String, String>, ProducerRecord<String, String>>)
new SimpleFunction<KafkaRecord<String, String>, ProducerRecord<String, String>>() {
@Override
public ProducerRecord<String, String> apply(KafkaRecord<String, String> input) {
return new ProducerRecord<>("outputTopic", null, input.getTimestamp(),
input.getKV().getKey(), input.getKV().getValue(), input.getHeaders());
}
}))
.setCoder(ProducerRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.apply("Write to Kafka", KafkaIO.<String, String>writeRecords()
.withBootstrapServers("localhost:9092")
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class));
LOG.info("Starting pipeline...");
pipeline.run();
自定义窗口Fn
public class CustomWindowFn extends PartitioningWindowFn<KafkaRecord<String, String>, IntervalWindow> {
private final Instant startingTime;
private final Duration size;
private Instant intervalStartTime;
public CustomWindowFn(Instant startingTime, Duration size) {
this.startingTime = startingTime;
this.size = size;
}
@Override
public IntervalWindow assignWindow(Instant timestamp) {
if (timestamp.isBefore(startingTime)) {
Instant firstRecordTimestamp = getIntervalStartTime(timestamp);
return new IntervalWindow(firstRecordTimestamp, startingTime);
}
Instant start =
new Instant(timestamp.getMillis()
- timestamp.plus(size).getMillis() % size.getMillis());
return new IntervalWindow(start, start.plus(size));
}
private Instant getIntervalStartTime(Instant timestamp) {
if (isNull(intervalStartTime)) {
intervalStartTime = timestamp;
}
return intervalStartTime;
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return this.equals(other);
}
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
if (!this.isCompatible(other)) {
throw new IncompatibleWindowException(
other,
String.format(
"Only %s objects with the same size and startingTime are compatible.",
CustomWindowFn.class.getSimpleName()));
}
}
public Instant getStartingTime() {
return startingTime;
}
public Duration getSize() {
return size;
}
@Override
public boolean equals(@Nullable Object object) {
if (!(object instanceof CustomWindowFn)) {
return false;
}
CustomWindowFn other = (CustomWindowFn) object;
return getStartingTime().equals(other.getStartingTime())
&& getSize().equals(other.getSize());
}
@Override
public int hashCode() {
return Objects.hash(size, startingTime);
}
}
列表组合器
public class ListCombiner extends Combine.CombineFn<KafkaRecord<String, String>,
List<KafkaRecord<String, String>>, List<KafkaRecord<String, String>>> {
@Override
public List<KafkaRecord<String, String>> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<KafkaRecord<String, String>> addInput(List<KafkaRecord<String, String>> mutableAccumulator,
KafkaRecord<String, String> input) {
if(input != null) {
mutableAccumulator.add(input);
}
return mutableAccumulator;
}
@Override
public List<KafkaRecord<String, String>> mergeAccumulators(Iterable<List<KafkaRecord<String, String>>> accumulators) {
return StreamSupport.stream(accumulators.spliterator(), false)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
@Override
public List<KafkaRecord<String, String>> extractOutput(List<KafkaRecord<String, String>> accumulator) {
return accumulator;
}
}