我正在尝试扩展 FlinkKafkaConsumer 以使用 flink 版本 1.12 在我的 Flink 作业中限制 Kafka 消费者。作为其中的一部分,我尝试遵循以下线程来实现相同的目的。但是,我在创建AbstractFetcher时在createFetcher方法中遇到了编译问题。
他们是在emitRecord中命名的方法吗,因为我找不到KafkaFetcher和AbstractFetcher类?
下面是代码片段
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> partitionsWithOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup, boolean useMetrics)
throws Exception {
return new KafkaFetcher<T>(
sourceContext,
partitionsWithOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getTaskNameWithSubtasks(),
deserializer,
properties,
pollTimeout,
runtimeContext.getMetricGroup(),
consumerMetricGroup,
useMetrics) {
protected void emitRecord(T record,
KafkaTopicPartitionState<TopicPartition> partitionState,
long offset) throws Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecord(record, partitionState, offset);
}
@Override
protected void emitRecordWithTimestamp(T record,
KafkaTopicPartitionState<TopicPartition> partitionState,
long offset, long timestamp) throws Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecordWithTimestamp(record, partitionState, offset, timestamp);
}
};
}
解决此问题的任何建议是获取 Flink Kafka Consumer 的自定义速率限制