我设法通过保持spark.read
原样来解决它,忽略 group.id 等。但是用我自己的 KafkaConsumer 逻辑围绕它。
protected val kafkaConsumer: String => KafkaConsumer[Array[Byte], Array[Byte]] =
groupId => {
val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
new KafkaConsumer[Array[Byte], Array[Byte]](props)
}
protected def getPartitions(kafkaConsumer: KafkaConsumer[_, _], topic: String): List[TopicPartition] = {
import scala.collection.JavaConverters._
kafkaConsumer
.partitionsFor(topic)
.asScala
.map(p => new TopicPartition(topic, p.partition()))
.toList
}
protected def getPartitionOffsets(kafkaConsumer: KafkaConsumer[_, _], topic: String, partitions: List[TopicPartition]): Map[String, Map[String, Long]] = {
Map(
topic -> partitions
.map(p => p.partition().toString -> kafkaConsumer.position(p))
.map {
case (partition, offset) if offset == 0L => partition -> -2L
case mapping => mapping
}
.toMap
)
}
def getStartingOffsetsString(kafkaConsumer: KafkaConsumer[_, _], topic: String)(implicit logger: Logger): String = {
Try {
import scala.collection.JavaConverters._
val partitions: List[TopicPartition] = getPartitions(kafkaConsumer, topic)
kafkaConsumer.assign(partitions.asJava)
val startOffsets: Map[String, Map[String, Long]] = getPartitionOffsets(kafkaConsumer, topic, partitions)
logger.debug(s"Starting offsets for $topic: ${startOffsets(topic).filterNot(_._2 == -2L)}")
implicit val formats = org.json4s.DefaultFormats
Serialization.write(startOffsets)
} match {
case Success(jsonOffsets) => jsonOffsets
case Failure(e) =>
logger.error(s"Failed to retrieve starting offsets for $topic: ${e.getMessage}")
"earliest"
}
}
// MAIN CODE
val groupId = consumerGroupId(name)
val currentKafkaConsumer = kafkaConsumer(groupId)
val topic = config.topic.getOrElse(name)
val startingOffsets = getStartingOffsetsString(currentKafkaConsumer, topic)
val dataFrame = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", config.bootstrapServers)
.option("subscribe", topic)
.option("includeHeaders", "true")
.option("startingOffsets", startingOffsets)
.option("enable.auto.commit", "false")
.load()
Try {
import scala.collection.JavaConverters._
val partitions: List[TopicPartition] = getPartitions(kafkaConsumer, topic)
val numRecords = dataFrame.cache().count() // actually read data from kafka
kafkaConsumer.seekToEnd(partitions.asJava) // assume the read has head everything
val endOffsets: Map[String, Map[String, Long]] = getPartitionOffsets(kafkaConsumer, topic, partitions)
logger.debug(s"Loaded $numRecords records")
logger.debug(s"Ending offsets for $topic: ${endOffsets(topic).filterNot(_._2 == -2L)}")
kafkaConsumer.commitSync()
kafkaConsumer.close()
} match {
case Success(_) => ()
case Failure(e) =>
logger.error(s"Failed to set offsets for $topic: ${e.getMessage}")
}