我们从应用程序中的 avro 文件中读取时间戳信息。我正在测试从 Spark 2.3.1 到 Spark 2.4 的升级,其中包括新内置的 spark-avro 集成。但是,我无法弄清楚如何告诉 avro 模式我希望时间戳具有“timestamp-millis”的逻辑类型,而不是默认的“timestamp-micros”。
仅通过使用 Databricks spark-avro 4.0.0 包查看 Spark 2.3.1 下的测试 avro 文件,我们就有以下字段/模式:
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":["long","null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}
自 epoch 存储为 long 以来,其中的 searchTime 为毫秒。一切都很好。
当我升级到 Spark 2.4 和内置 spark-avro 2.4.0 包时,我有了这些更新的字段/模式:
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}
可以看到,底层类型仍然是 long,但现在增加了“timestamp-micros”的logicalType。这正是发行说明所说的那样,但是,我找不到指定架构以使用“timestamp-millis”选项的方法。
这成为一个问题,当我向 avro 文件写入一个 Timestamp 对象时,该对象被初始化为 10,000 秒后的 epoch,它将被读取为 10,000,000 秒。在 2.3.1/databricks-avro 下,它只是一个 long 没有与之相关的信息,所以它是刚进去就出来的。
我们目前通过反映感兴趣的对象来构建模式,如下所示:
val searchSchema: StructType = ScalaReflection.schemaFor[searchEntry].dataType.asInstanceOf[StructType]
我尝试通过创建一个修改后的模式来扩充这一点,该模式试图替换与 searchTime 条目对应的 StructField,如下所示:
val modSearchSchema = StructType(searchSchema.fields.map {
case StructField(name, _, nullable, metadata) if name == "searchTime" =>
StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
case f => f
})
但是,在 spark.sql.types 中定义的 StructField 对象没有可以扩充其中的 dataType 的logicalType 的概念。
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty)
我还尝试通过两种方式从 JSON 表示创建模式:
val schemaJSONrepr = """{
| "name" : "id",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "searchQuery",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "searchTime",
| "type" : "long",
| "logicalType" : "timestamp-millis",
| "nullable" : false,
| "metadata" : { }
| }, {
| "name" : "score",
| "type" : "double",
| "nullable" : false,
| "metadata" : { }
| }, {
| "name" : "searchType",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }""".stripMargin
第一次尝试只是从中创建一个 DataType
// here spark is a SparkSession instance from a higher scope.
val schema = DataType.fromJSON(schemaJSONrepr).asInstanceOf[StructType]
spark.read
.schema(schema)
.format("avro")
.option("basePath", baseUri)
.load(uris: _*)
失败是因为它无法为 searchTime 节点创建 StructType,因为其中包含“logicalType”。第二次尝试是通过传入原始 JSON 字符串来简单地创建模式。
spark.read
.schema(schemaJSONrepr)
.format("avro")
.option("basePath", baseUri)
.load(uris: _*)
这没有说:
mismatched input '{' expecting {'SELECT', 'FROM', ...
== SQL ==
{
^^^
我发现在spark-avro API中有一种方法可以从模式中获取逻辑类型,但无法弄清楚如何设置。
正如您在上面看到的失败尝试,我尝试使用 Schema.Parser 创建 avro 模式对象,但 spark.read.schema 中唯一接受的类型是 String 和 StructType。
如果有人可以提供有关如何更改/指定此逻辑类型的见解,我将非常感激。谢谢