0

我正在尝试为我的 Spark Ba​​tch 作业检索 Kafka 偏移量。检索偏移量后,我想关闭流上下文。

我尝试在流上下文中添加一个流侦听器,并实现 onBatchCompleted 方法以在作业完成后关闭流,但我收到异常"Cannot stop StreamingContext within listener bus thread"

有针对这个的解决方法吗?我正在尝试检索偏移量以调用 KafkaUtils.createRDD(sparkContext, kafkaProperties, OffsetRange[], LocationStrateg)

private OffsetRange[] getOffsets(SparkConf sparkConf) throws InterruptedException {
    final AtomicReference<OffsetRange[]> atomicReference = new AtomicReference<>();

    JavaStreamingContext sc = new JavaStreamingContext(sparkConf, Duration.apply(50));
    JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(sc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(Arrays.asList("test"), getKafkaParam()));
    stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {
                atomicReference.set(((HasOffsetRanges) rdd.rdd()).offsetRanges());
                // sc.stop(false); //this would throw exception saying consumer is already closed
            }
    );
    sc.addStreamingListener(new TopicListener(sc)); //Throws exception saying "Cannot stop StreamingContext within listener bus thread."
    sc.start();
    sc.awaitTermination();
    return atomicReference.get();
}



public class TopicListener implements StreamingListener {
private JavaStreamingContext sc;

public TopicListener(JavaStreamingContext sc){
    this.sc = sc;
}
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted streamingListenerBatchCompleted) {
    sc.stop(false);
}

非常感谢stackoverflow-ers :) 我已经尝试搜索可能的解决方案,但到目前为止还没有成功

编辑:我使用 KafkaConsumer 来获取分区信息。获得分区信息后,我创建一个 TopicPartition pojos 列表并调用 position 和 endOffsets 方法分别获取我的 groupId 的当前位置和结束位置。

final List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor("theTopicName");
final List<TopicPartition> topicPartitions = new ArrayList<>();
partitionInfos.forEach(partitionInfo -> topicPartitions.add(new TopicPartition("theTopicName", partitionInfo.partition())));
final List<OffsetRange> offsetRanges = new ArrayList<>();
kafkaConsumer.assign(topicPartitions);
topicPartitions.foreach(topicPartition -> {
    long fromOffset = kafkaConsumer.position(topicPartition);
    kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
    long untilOffset = kafkaConsumer.position(topicPartition);
    offsetRanges.add(new OffsetRange(topicPartition.topic(), topicPartition.partition(), fromOffset, untilOffset));
});
return offsetRanges.toArray(new OffsetRange[offsetRanges.size()]);
4

1 回答 1

0

如果你想控制流量,你可以考虑使用轮询而不是流 api。这样,一旦达到目标,您就可以清楚地停止投票。

也检查一下...

https://github.com/dibbhatt/kafka-spark-consumer

于 2018-12-11T01:50:31.627 回答