26

在 PySpark 中,您可以定义一个模式并使用此预定义模式读取数据源,例如:

Schema = StructType([ StructField("temperature", DoubleType(), True),
                      StructField("temperature_unit", StringType(), True),
                      StructField("humidity", DoubleType(), True),
                      StructField("humidity_unit", StringType(), True),
                      StructField("pressure", DoubleType(), True),
                      StructField("pressure_unit", StringType(), True)
                    ])

对于某些数据源,可以从数据源推断模式并获得具有此模式定义的数据框。

是否可以从数据帧中获取模式定义(以上述形式),其中数据已被推断出?

df.printSchema()将模式打印为树,但我需要重用模式,将其定义如上,因此我可以读取具有此模式的数据源,该模式之前已从另一个数据源推断。

4

4 回答 4

34

是的,有可能。采用DataFrame.schema property

schema

以 pyspark.sql.types.StructType 形式返回此 DataFrame 的架构。

>>> df.schema
StructType(List(StructField(age,IntegerType,true),StructField(name,StringType,true)))

1.3 版中的新功能。

Schema也可以导出为 JSON 并在需要时重新导入。

于 2019-02-03T13:06:43.670 回答
7

您可以为现有 Dataframe 重用架构

l = [('Ankita',25,'F'),('Jalfaizy',22,'M'),('saurabh',20,'M'),('Bala',26,None)]
people_rdd=spark.sparkContext.parallelize(l)
schemaPeople = people_rdd.toDF(['name','age','gender'])

schemaPeople.show()

+--------+---+------+
|    name|age|gender|
+--------+---+------+
|  Ankita| 25|     F|
|Jalfaizy| 22|     M|
| saurabh| 20|     M|
|    Bala| 26|  null|
+--------+---+------+

spark.createDataFrame(people_rdd,schemaPeople.schema).show()

+--------+---+------+
|    name|age|gender|
+--------+---+------+
|  Ankita| 25|     F|
|Jalfaizy| 22|     M|
| saurabh| 20|     M|
|    Bala| 26|  null|
+--------+---+------+

只需使用 df.schema 即可获取数据框的底层架构

schemaPeople.schema

StructType(List(StructField(name,StringType,true),StructField(age,LongType,true),StructField(gender,StringType,true)))
于 2019-02-03T15:08:50.067 回答
6

下面的代码将为您提供已知数据帧的格式良好的表格模式定义。当您有非常多的列并且编辑很麻烦时非常有用。然后,您现在可以将其应用到您的新数据框并相应地手动编辑您可能想要的任何列。

from pyspark.sql.types import StructType

schema = [i for i in df.schema] 

然后从这里,你有你的新模式:

NewSchema = StructType(schema)
于 2020-02-09T20:06:07.627 回答
2

如果您正在寻找来自 PySpark 的 DDL 字符串:

df: DataFrame = spark.read.load('LOCATION')
schema_json = df.schema.json()
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()
于 2020-12-14T15:58:21.757 回答