4

我希望在 Spark 1.6 上使用 DataFrames API 构建一个 Spark Streaming 应用程序。在我深入兔子洞之前,我希望有人可以帮助我了解 DataFrames 如何处理具有不同模式的数据。

这个想法是消息将通过 Avro 模式流入 Kafka。我们应该能够以向后兼容的方式发展模式,而无需重新启动流应用程序(应用程序逻辑仍然有效)。

使用模式注册表和嵌入在消息中的模式 id 使用 KafkaUtils 创建直接流和 AvroKafkaDecoder(来自 Confluent)反序列化新版本的消息似乎很简单。这让我有一个 DStream。

问题 #1:在该 DStream 中会有不同版本的模式的对象。因此,当我将每个对象转换为 Row 对象时,我应该传入一个读取器模式,该模式是正确迁移数据的最新模式,并且我需要将最新模式传递给 sqlContext.createDataFrame(rowRdd, schema) 调用。DStream 中的对象是 GenericData.Record 类型,据我所知,没有简单的方法可以判断哪个是最新版本。我看到了 2 种可能的解决方案,一种是调用模式注册表以在每个微批次上获取最新版本的模式。另一种是修改解码器以附加模式ID。然后我可以遍历 rdd 以找到最高 id 并从本地缓存中获取模式。

我希望有人已经以可重用的方式很好地解决了这个问题。

问题/问题 #2:Spark 将为每个分区从 Kafka 拉取不同的执行程序。当一个执行者收到与其他执行者不同的“最新”模式时,我的应用程序会发生什么。由一个执行程序创建的 DataFrame 在同一时间窗口中将具有与另一个执行程序不同的架构。我实际上不知道这是否是一个真正的问题。我无法可视化数据流,以及什么样的操作会出现问题。如果这是一个问题,则意味着执行者之间需要共享一些数据,这听起来既复杂又低效。

我需要担心这个吗?如果我这样做,如何解决架构差异?

谢谢,--本

4

2 回答 2

3

我相信我已经解决了这个问题。我正在使用 Confluent 的模式注册表和 KafkaAvroDecoder。简化的代码如下所示:

// Get the latest schema here. This schema will be used inside the
// closure below to ensure that all executors are using the same 
// version for this time slice.
val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000)
val m = sr.getLatestSchemaMetadata(subject)
val schemaId = m.getId
val schemaString = m.getSchema

val outRdd = rdd.mapPartitions(partitions => {
  // Note: we cannot use the schema registry from above because this code
  // will execute on remote machines, requiring the schema registry to be
  // serialized. We could use a pool of these.
  val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000)
  val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry)
  val parser = new Schema.Parser()
  val avroSchema = parser.parse(schemaString)
  val avroRecordConverter = AvroSchemaConverter.createConverterToSQL(avroSchema)

  partitions.map(input => {
    // Decode the message using the latest version of the schema.
    // This will apply Avro's standard schema evolution rules 
    // (for compatible schemas) to migrate the message to the 
    // latest version of the schema.
    val record = decoder.fromBytes(messageBytes, avroSchema).asInstanceOf[GenericData.Record]
    // Convert record into a DataFrame with columns according to the schema
    avroRecordConverter(record).asInstanceOf[Row]
  })
})

// Get a Spark StructType representation of the schema to apply 
// to the DataFrame.
val sparkSchema = AvroSchemaConverter.toSqlType(
      new Schema.Parser().parse(schemaString)
    ).dataType.asInstanceOf[StructType]
sqlContext.createDataFrame(outRdd, sparkSchema)
于 2016-12-27T20:29:51.223 回答
0

我仅使用结构化流来完成此操作。

case class DeserializedFromKafkaRecord(value: String)
 
val brokers = "...:9092"
val schemaRegistryURL = "...:8081"
var topicRead = "mytopic"
 
 
val kafkaParams = Map[String, String](
  "kafka.bootstrap.servers" -> brokers,
  "group.id" -> "structured-kafka",
  "failOnDataLoss"-> "false",
  "schema.registry.url" -> schemaRegistryURL
)
    
object topicDeserializerWrapper {
  val props = new Properties()
  props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL)
  props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
  val vProps = new kafka.utils.VerifiableProperties(props)
  val deser = new KafkaAvroDecoder(vProps)
  val avro_schema = new RestService(schemaRegistryURL).getLatestVersion(topicRead + "-value")
  val messageSchema = new Schema.Parser().parse(avro_schema.getSchema)
}
 
val df = {spark
  .readStream
  .format("kafka")
  .option("subscribe", topicRead)
  .option("kafka.bootstrap.servers", brokers)
  .option("auto.offset.reset", "latest")
  .option("failOnDataLoss", false)
  .option("startingOffsets", "latest")
  .load()
  .map(x => {
     DeserializedFromKafkaRecord(DeserializerWrapper.deser.fromBytes(x.getAs[Array[Byte]]("value"), DeserializerWrapper.messageSchema).asInstanceOf[GenericData.Record].toString)
  })}
于 2018-06-14T18:49:51.067 回答