我在 Spark Structured Streaming 中使用 Kafka Source 来接收 Confluent 编码的 Avro 记录。我打算使用 Confluent Schema Registry,但与 spark 结构化流的集成似乎是不可能的。
我已经看到了这个问题,但无法让它与 Confluent Schema Registry 一起使用。使用 Spark 2.0.2(结构化流)从 Kafka 读取 Avro 消息
我花了几个月的时间阅读源代码并进行测试。简而言之,Spark 只能处理 String 和 Binary 序列化。您必须手动反序列化数据。在 spark 中,创建 confluent rest 服务对象以获取 schema。使用 Avro 解析器将响应对象中的模式字符串转换为 Avro 模式。接下来,照常阅读Kafka主题。然后使用 Confluent KafkaAvroDeSerializer 映射二进制类型的“值”列。我强烈建议进入这些类的源代码,因为这里有很多事情要做,所以为简洁起见,我将省略很多细节。
//Used Confluent version 3.2.2 to write this.
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
case class DeserializedFromKafkaRecord(key: String, value: String)
val schemaRegistryURL = ""
val topicName = "Schema-Registry-Example-topic1"
val subjectValueName = topicName + "-value"
//create RestService object
val restService = new RestService(schemaRegistryURL)
//.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)
//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)
//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)
//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null
//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = sql.readStream
.option("kafka.bootstrap.servers", "")
.option("subscribe", topicName)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 20) //remove for prod
//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
row =>
if (keyDeserializer == null) {
keyDeserializer = new KafkaAvroDeserializer
keyDeserializer.configure(props.asJava, true) //isKey = true
if (valueDeserializer == null) {
valueDeserializer = new KafkaAvroDeserializer
valueDeserializer.configure(props.asJava, false) //isKey = false
//Pass the Avro schema.
val deserializedKeyString = keyDeserializer.deserialize(topicName, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
val deserializedValueString = valueDeserializer.deserialize(topicName, row.value, topicValueAvroSchema).toString
DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueString)
val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
.option("truncate", false)
此代码仅在本地主机上进行了测试,并且已报告在集群环境中遇到序列化程序问题。有一个替代解决方案(步骤 7-9,在步骤 10 中使用 Scala 代码)将模式 id 提取到列中,查找每个唯一 ID,然后使用模式广播变量,这将在规模上更好地工作。
也解决了将注册表与 Spark 一起使用的问题
这是所需的依赖项。使用 Confluent 5.x 和 Spark 2.4 测试的代码
<!-- Conflicts with Spark's version -->
这是 Scala 实现(仅在本地测试master=local[*]
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode
object App {
private var schemaRegistryClient: SchemaRegistryClient = _
private var kafkaAvroDeserializer: AvroDeserializer = _
def lookupTopicSchema(topic: String, isKey: Boolean = false) = {
schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
def avroSchemaToSparkSchema(avroSchema: String) = {
SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
// ... continues below
然后定义一个简单的 main 方法,解析 CMD args 以获取 Kafka 详细信息
def main(args: Array[String]): Unit = {
val cmd: CommandLine = parseArg(args)
val master = cmd.getOptionValue("master", "local[*]")
val spark = SparkSession.builder()
val bootstrapServers = cmd.getOptionValue("bootstrap-server")
val topic = cmd.getOptionValue("topic")
val schemaRegistryUrl = cmd.getOptionValue("schema-registry")
consumeAvro(spark, bootstrapServers, topic, schemaRegistryUrl)
// ... still continues
private def consumeAvro(spark: SparkSession, bootstrapServers: String, topic: String, schemaRegistryUrl: String): Unit = {
import spark.implicits._
// Setup the Avro deserialization UDF
schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
spark.udf.register("deserialize", (bytes: Array[Byte]) =>
// Load the raw Kafka topic (byte stream)
val rawDf = spark.readStream
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
// Deserialize byte stream into strings (Avro fields become JSON)
import org.apache.spark.sql.functions._
val jsonDf = rawDf.select(
// 'key.cast(DataTypes.StringType), // string keys are simplest to use
callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
callUDF("deserialize", 'value).as("value")
// excluding topic, partition, offset, timestamp, etc
// Get the Avro schema for the topic from the Schema Registry and convert it into a Spark schema type
val dfValueSchema = {
val rawSchema = lookupTopicSchema(topic)
// Apply structured schema to JSON stream
val parsedDf = jsonDf.select(
'key, // keys are usually plain strings
// values are JSONified Avro records
from_json('value, dfValueSchema.dataType).alias("value")
$"value.*" // flatten out the value
// parsedDf.printSchema()
// Sample schema output
// root
// |-- key: string (nullable = true)
// |-- header: struct (nullable = true)
// | |-- time: long (nullable = true)
// | ...
// TODO: Do something interesting with this stream
.option("truncate", false)
// still continues
命令行解析器允许传入引导服务器、模式注册表、主题名称和 Spark 主服务器。
private def parseArg(args: Array[String]): CommandLine = {
import org.apache.commons.cli._
val options = new Options
val masterOption = new Option("m", "master", true, "Spark master")
val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
val topicOption = new Option("t", "topic", true, "Kafka topic")
val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
val parser = new BasicParser
parser.parse(options, args)
// still continues
为了使上面的 UDF 工作,需要有一个反序列化器将字节的 DataFrame 带到包含反序列化 Avro 的字节
// Simple wrapper around Confluent deserializer
class AvroDeserializer extends AbstractKafkaAvroDeserializer {
def this(client: SchemaRegistryClient) {
// TODO: configure the deserializer for authentication
this.schemaRegistry = client
override def deserialize(bytes: Array[Byte]): String = {
val value = super.deserialize(bytes)
value match {
case str: String =>
case _ =>
val genericRecord = value.asInstanceOf[GenericRecord]
} // end 'object App'
将这些块中的每一个放在一起,添加-b localhost:9092 -s http://localhost:8081 -t myTopic
到Run Configurations > Program Arguments后,它可以在 IntelliJ 中工作
这是我将 spark 结构化流与 kafka 和模式注册表集成的代码示例(scala 中的代码)
import org.apache.spark.sql.SparkSession
import io.confluent.kafka.schemaregistry.client.rest.RestService // <artifactId>kafka-schema-registry</artifactId>
import org.apache.spark.sql.avro.from_avro // <artifactId>spark-avro_${scala.compat.version}</artifactId>
import org.apache.spark.sql.functions.col
object KafkaConsumerAvro {
def main(args: Array[String]): Unit = {
val KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
val SCHEMA_REGISTRY_URL = "http://localhost:8081"
val TOPIC = "transactions"
val spark: SparkSession = SparkSession.builder().appName("KafkaConsumerAvro").getOrCreate()
val df = spark.readStream
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
.option("subscribe", TOPIC)
.option("startingOffsets", "earliest") // from starting
// Prints Kafka schema with columns (topic, offset, partition e.t.c)
// Create REST service to access schema registry and retrieve topic schema (latest)
val restService = new RestService(SCHEMA_REGISTRY_URL)
val valueRestResponseSchema = restService.getLatestVersion(TOPIC + "-value")
val jsonSchema = valueRestResponseSchema.getSchema
val transactionDF = df.select(
col("key").cast("string"), // cast to string from binary value
from_avro(col("value"), jsonSchema).as("transaction"), // convert from avro value
// Stream data to console for testing
从 kafka 主题中读取时,我们有这种模式:
键:二进制 | 值:二进制 | 主题:字符串 | 分区:整数 | 偏移量:长 | 时间戳:时间戳 | 时间戳类型:整数 |
正如我们所见,key 和 value 是二进制的,所以我们需要将 key 转换为字符串,在这种情况下,value 是 avro 格式的,所以我们可以通过调用from_avro
除了 Spark 和 Kafka 依赖项之外,我们还需要以下依赖项:
这个图书馆将为您完成这项工作。它通过 Spark Structured Stream 连接到 Confluent Schema Registry。
对于 Confluent,它处理与有效负载一起发送的模式 id。
披露:我为 ABSA 工作并开发了这个库。
Databricks 现在提供此功能,但您必须为此付费 :-(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
有关更多信息,请参见: https ://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html
一个不错的免费替代品是 ABRIS。请参阅:https ://github.com/AbsaOSS/ABRiS我们可以看到的唯一缺点是您需要在运行时提供您的 avro 架构文件,以便框架可以在将其发布到 Kafka 主题之前在您的数据帧上强制执行此架构.
根据@cricket_007 的回答,我创建了以下可以在我们的集群环境中运行的解决方案,包括以下新功能:
"org.apache.spark:spark-sql-kafka-0-10_${SCALA_VERSION}:${SPARK_VERSION}" ## format("kafka")
"org.apache.spark:spark-avro_${SCALA_VERSION}:${SPARK_VERSION}" ## SchemaConverters
"io.confluent:kafka-schema-registry:${CONFLUENT_VERSION}" ## import io.confluent.kafka.schemaregistry.client.rest.RestService
"io.confluent:kafka-avro-serializer:${CONFLUENT_VERSION}" ## import io.confluent.kafka.serializers.KafkaAvroDeserializer
./bin/spark-shell --packages ${"${jars[*]}"// /,}
以下是我在 spark-shell 中测试的全部代码:
import org.apache.avro.Schema
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.schemaregistry.client.rest.RestService
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.avro.SchemaConverters
import scala.collection.JavaConverters._
import java.time.LocalDateTime
val brokerServers = "xxx.yyy.zzz:9092"
val topicName = "mytopic"
val schemaRegistryURL = "http://xxx.yyy.zzz:8081"
val restService = new RestService(schemaRegistryURL)
val exParser = new Schema.Parser
//-- For both key and value
val schemaNames = Seq("key", "value")
val schemaStrings = schemaNames.map(i => (i -> restService.getLatestVersion(s"$topicName-$i").getSchema)).toMap
val tempStructMap = schemaStrings.transform((k,v) => SchemaConverters.toSqlType(exParser.parse(v)).dataType)
val schemaStruct = new StructType().add("key", tempStructMap("key")).add("value", tempStructMap("value"))
//-- For key only
// val schemaStrings = restService.getLatestVersion(s"$topicName-key").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType
//-- For value only
// val schemaStrings = restService.getLatestVersion(s"$topicName-value").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType
val query = spark
.option("kafka.bootstrap.servers", brokerServers)
.option("subscribe", topicName)
//.option("checkpointLocation", s"cos://$bucket.service/checkpoints/$tableName")
.foreachBatch((batchDF: DataFrame, batchId: Long) => {
val bcTopicName = sc.broadcast(topicName)
val bcSchemaRegistryURL = sc.broadcast(schemaRegistryURL)
val bcSchemaStrings = sc.broadcast(schemaStrings)
val rstDF = batchDF.map {
row =>
val props = Map("schema.registry.url" -> bcSchemaRegistryURL.value)
//-- For both key and value
val isKeys = Map("key" -> true, "value" -> false)
val deserializers = isKeys.transform{ (k,v) =>
val des = new KafkaAvroDeserializer
des.configure(props.asJava, v)
//-- For key only
// val deserializer = new KafkaAvroDeserializer
// deserializer.configure(props.asJava, true)
//-- For value only
// val deserializer = new KafkaAvroDeserializer
// deserializer.configure(props.asJava, false)
val inParser = new Schema.Parser
//-- For both key and value
val values = bcSchemaStrings.value.transform( (k,v) =>
deserializers(k).deserialize(bcTopicName.value, row.getAs[Array[Byte]](k), inParser.parse(v)).toString)
s"""{"key": ${values("key")}, "value": ${values("value")} }"""
//-- For key only
// deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("key"), inParser.parse(bcSchemaStrings.value)).toString
//-- For value only
// deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("value"), inParser.parse(bcSchemaStrings.value)).toString
.select(from_json(col("value"), schemaStruct).as("root"))
println(s"${LocalDateTime.now} --- Batch $batchId: ${rstDF.count} rows")
.trigger(Trigger.ProcessingTime("60 seconds"))
import requests
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.avro.functions import *
# variables
topic = "my-topic"
schemaregistry = "http://localhost:8081"
kafka_brokers = "kafka1:9092,kafka2:9092"
# retrieve the latest schema
response = requests.get('{}/subjects/{}-value/versions/latest/schema'.format(schemaregistry, topic))
# error check
# extract the schema from the response
schema = response.text
# run the query
query = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", kafka_brokers) \
.option("subscribe", topic) \
.load() \
# The magic goes here:
# Skip the first 5 bytes (reserved by schema registry encoding protocol)
.selectExpr("substring(value, 6) as avro_value") \
.select(from_avro(col("avro_value"), schema).alias("data")) \
.select(col("data.my_field")) \
.writeStream \
.format("console") \
.outputMode("complete") \
:felipe 引用的库之前对我来说在 JVM 上运行良好,所以我编写了一个小的包装函数,将它集成到 python 中。这看起来很 hacky,因为 scala 语言中隐含的许多类型必须在 py4j 中显式指定。到目前为止,即使在 spark 2.4.1 中也运行良好。
def expand_avro(spark_context, sql_context, data_frame, schema_registry_url, topic):
j = spark_context._gateway.jvm
dataframe_deserializer = j.za.co.absa.abris.avro.AvroSerDe.DataframeDeserializer(data_frame._jdf)
naming_strategy = getattr(
"SchemaStorageNamingStrategies$"), "MODULE$").TOPIC_NAME()
conf = getattr(getattr(j.scala.collection.immutable.Map, "EmptyMap$"), "MODULE$")
conf = getattr(conf, "$plus")(j.scala.Tuple2("schema.registry.url", schema_registry_url))
conf = getattr(conf, "$plus")(j.scala.Tuple2("schema.registry.topic", topic))
conf = getattr(conf, "$plus")(j.scala.Tuple2("value.schema.id", "latest"))
conf = getattr(conf, "$plus")(j.scala.Tuple2("value.schema.naming.strategy", naming_strategy))
schema_path = j.scala.Option.apply(None)
conf = j.scala.Option.apply(conf)
policy = getattr(j.za.co.absa.abris.avro.schemas.policy.SchemaRetentionPolicies, "RETAIN_SELECTED_COLUMN_ONLY$")()
data_frame = dataframe_deserializer.fromConfluentAvro("value", schema_path, conf, policy)
data_frame = DataFrame(data_frame, sql_context)
return data_frame
为此,您必须将库添加到 spark 包中,例如
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages ' \
'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1,' \
'org.apache.spark:spark-avro_2.11:2.4.1,' \
'za.co.absa:abris_2.11:2.2.2 ' \
'--repositories https://packages.confluent.io/maven/ ' \