0

我正在尝试扩展 FlinkKafkaConsumer 以使用 flink 版本 1.12 在我的 Flink 作业中限制 Kafka 消费者。作为其中的一部分,我尝试遵循以下线程来实现相同的目的。但是,我在创建AbstractFetcher时在createFetcher方法中遇到了编译问题。

如何在 flink 上使用 Ratelimiter?

他们是在emitRecord中命名的方法吗,因为我找不到KafkaFetcherAbstractFetcher类?

下面是代码片段

protected AbstractFetcher<T, ?> createFetcher(
        SourceContext<T> sourceContext,
        Map<KafkaTopicPartition, Long> partitionsWithOffsets,
        SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
        SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
        StreamingRuntimeContext runtimeContext,
        OffsetCommitMode offsetCommitMode,
        MetricGroup consumerMetricGroup, boolean useMetrics)
        throws Exception {

    return new KafkaFetcher<T>(
            sourceContext,
            partitionsWithOffsets,
            watermarksPeriodic,
            watermarksPunctuated,
            runtimeContext.getProcessingTimeService(),
            runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
            runtimeContext.getUserCodeClassLoader(),
            runtimeContext.getTaskNameWithSubtasks(),
            deserializer,
            properties,
            pollTimeout,
            runtimeContext.getMetricGroup(),
            consumerMetricGroup,
            useMetrics) {
        protected void emitRecord(T record,
                                  KafkaTopicPartitionState<TopicPartition> partitionState,
                                  long offset) throws Exception {
            subtaskRateLimiter.acquire();
            if (record == null) {
                consumerMetricGroup.counter("invalidRecord").inc();
            }
            super.emitRecord(record, partitionState, offset);
        }

        @Override
        protected void emitRecordWithTimestamp(T record,
                                               KafkaTopicPartitionState<TopicPartition> partitionState,
                                               long offset, long timestamp) throws Exception {
            subtaskRateLimiter.acquire();
            if (record == null) {
                consumerMetricGroup.counter("invalidRecord").inc();
            }
            super.emitRecordWithTimestamp(record, partitionState, offset, timestamp);
        }
    };

}

解决此问题的任何建议是获取 Flink Kafka Consumer 的自定义速率限制

4

1 回答 1

0

emitRecord现在由org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter#emitRecord.

于 2021-06-17T08:18:50.297 回答