0

我需要从多个 Kafka 主题中读取按时间计算的起始偏移量,按时间戳对它们进行排序并发送到另一个 kafka 主题。所有 kafka 主题都有 1 个分区。

再举一个例子来更好地描述用例。

我们有inputTopic1, inputTopic2outputTopic。我们需要使用inputTopics过去 12 小时的数据并继续使用实时数据。所有消耗的数据都需要排序并发布到outputTopic.

我试图创建自定义windowFn来处理这个用例,但是我在output主题中得到了无序的数据。

我有 2 个问题。

我是否选择了正确的方法来解决这个问题?我是否以正确的方式实施它?

管道

Instant nowAsInstant = Instant.now();
        Pipeline pipeline = Pipeline.create(options);
        pipeline.apply("Read from Topics", KafkaIO.<String, String>read()
                .withTopics(List.of("topic1", "topic2"))
                .withBootstrapServers("localhost:9092")
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withCreateTime(Duration.ZERO)
                .withStartReadTime(nowAsInstant.minus(Duration.standardHours(12)))
                .withConsumerConfigUpdates(consumerConfig)
                .commitOffsetsInFinalize())
                .apply(Window.into(new CustomWindowFn(nowAsInstant, Duration.millis(500))))
                .apply(Combine.globally(new ListCombiner()).withoutDefaults())
                .apply("Sort", MapElements.via(
                        new SimpleFunction<Iterable<KafkaRecord<String, String>>, List<KafkaRecord<String, String>>>() {
                            @Override
                            public List<KafkaRecord<String, String>> apply(Iterable<KafkaRecord<String, String>> input) {
                                return StreamSupport.stream(input.spliterator(), false)
                                        .sorted(KAFKA_RECORD_COMPARATOR)
                                        .collect(Collectors.toUnmodifiableList());
                            }
                        }
                ))
                .apply(Flatten.iterables())
                .apply("mapToProducerRecord", MapElements.<ProducerRecord<String, String>>into(new ProducerRecordCoder<>(
                        StringUtf8Coder.of(), StringUtf8Coder.of()).getEncodedTypeDescriptor())
                        .via((SerializableFunction<KafkaRecord<String, String>, ProducerRecord<String, String>>)
                                new SimpleFunction<KafkaRecord<String, String>, ProducerRecord<String, String>>() {
                                    @Override
                                    public ProducerRecord<String, String> apply(KafkaRecord<String, String> input) {
                                        return new ProducerRecord<>("outputTopic", null, input.getTimestamp(),
                                                input.getKV().getKey(), input.getKV().getValue(), input.getHeaders());
                                    }
                                }))
                .setCoder(ProducerRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
                .apply("Write to Kafka", KafkaIO.<String, String>writeRecords()
                        .withBootstrapServers("localhost:9092")
                        .withKeySerializer(StringSerializer.class)
                        .withValueSerializer(StringSerializer.class));

        LOG.info("Starting pipeline...");
        pipeline.run();

自定义窗口Fn

public class CustomWindowFn extends PartitioningWindowFn<KafkaRecord<String, String>, IntervalWindow> {
    private final Instant startingTime;
    private final Duration size;
    private Instant intervalStartTime;

    public CustomWindowFn(Instant startingTime, Duration size) {
        this.startingTime = startingTime;
        this.size = size;
    }

    @Override
    public IntervalWindow assignWindow(Instant timestamp) {
        if (timestamp.isBefore(startingTime)) {
            Instant firstRecordTimestamp = getIntervalStartTime(timestamp);
            return new IntervalWindow(firstRecordTimestamp, startingTime);
        }

        Instant start =
                new Instant(timestamp.getMillis()
                        - timestamp.plus(size).getMillis() % size.getMillis());

        return new IntervalWindow(start, start.plus(size));
    }

    private Instant getIntervalStartTime(Instant timestamp) {
        if (isNull(intervalStartTime)) {
            intervalStartTime = timestamp;
        }

        return intervalStartTime;
    }

    @Override
    public boolean isCompatible(WindowFn<?, ?> other) {
        return this.equals(other);
    }

    @Override
    public Coder<IntervalWindow> windowCoder() {
        return IntervalWindow.getCoder();
    }

    @Override
    public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
        if (!this.isCompatible(other)) {
            throw new IncompatibleWindowException(
                    other,
                    String.format(
                            "Only %s objects with the same size and startingTime are compatible.",
                            CustomWindowFn.class.getSimpleName()));
        }
    }


    public Instant getStartingTime() {
        return startingTime;
    }

    public Duration getSize() {
        return size;
    }

    @Override
    public boolean equals(@Nullable Object object) {
        if (!(object instanceof CustomWindowFn)) {
            return false;
        }

        CustomWindowFn other = (CustomWindowFn) object;
        return getStartingTime().equals(other.getStartingTime())
                && getSize().equals(other.getSize());
    }

    @Override
    public int hashCode() {
        return Objects.hash(size, startingTime);
    }
}

列表组合器

public class ListCombiner extends Combine.CombineFn<KafkaRecord<String, String>,
        List<KafkaRecord<String, String>>, List<KafkaRecord<String, String>>> {

    @Override
    public List<KafkaRecord<String, String>> createAccumulator() {
        return new ArrayList<>();
    }

    @Override
    public List<KafkaRecord<String, String>> addInput(List<KafkaRecord<String, String>> mutableAccumulator,
                                                      KafkaRecord<String, String> input) {
        if(input != null) {
            mutableAccumulator.add(input);
        }

        return mutableAccumulator;
    }

    @Override
    public List<KafkaRecord<String, String>> mergeAccumulators(Iterable<List<KafkaRecord<String, String>>> accumulators) {
        return StreamSupport.stream(accumulators.spliterator(), false)
                .flatMap(Collection::stream)
                .collect(Collectors.toList());
    }

    @Override
    public List<KafkaRecord<String, String>> extractOutput(List<KafkaRecord<String, String>> accumulator) {
        return accumulator;
    }
}

我也阅读了及时有状态的处理文章,但我不确定它如何用于以下任务。

4

0 回答 0