2

我知道我在这里问了一个类似的问题,但那是针对行过滤的。这次我试图删除列。我尝试实现高阶函数等FILTER一段时间,但无法让它工作。我认为我需要的是一个SELECT高阶函数,但它似乎不存在。谢谢您的帮助!

我正在使用 pyspark 并且我有一个数据框对象df,这就是输出的df.printSchema()样子

root
 |-- M_MRN: string (nullable = true)
 |-- measurements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Observation_ID: string (nullable = true)
 |    |    |-- Observation_Name: string (nullable = true)
 |    |    |-- Observation_Result: string (nullable = true)

我只想在“测量”中保留“观察 ID”或“观察结果”列。所以目前当我跑步时,df.select('measurements').take(2)我得到

[Row(measurements=[Row(Observation_ID='5', Observation_Name='ABC', Observation_Result='108/72'),
                   Row(Observation_ID='11', Observation_Name='ABC', Observation_Result='70'),
                   Row(Observation_ID='10', Observation_Name='ABC', Observation_Result='73.029'),
                   Row(Observation_ID='14', Observation_Name='XYZ', Observation_Result='23.1')]),
 Row(measurements=[Row(Observation_ID='2', Observation_Name='ZZZ', Observation_Result='3/4'),
                   Row(Observation_ID='5', Observation_Name='ABC', Observation_Result='7')])]

我希望在完成上述过滤并运行后df.select('measurements').take(2)得到

[Row(measurements=[Row(Observation_ID='5', Observation_Result='108/72'),
                   Row(Observation_ID='11', Observation_Result='70'),
                   Row(Observation_ID='10', Observation_Result='73.029'),
                   Row(Observation_ID='14', Observation_Result='23.1')]),
 Row(measurements=[Row(Observation_ID='2', Observation_Result='3/4'),
                   Row(Observation_ID='5', Observation_Result='7')])]

有没有办法在 pyspark 中做到这一点?感谢您期待您的帮助!

4

2 回答 2

2

您可以使用所需的字段并将它们放在. higher order function transformselectstruct

from pyspark.sql import functions as F
df.withColumn("measurements",F.expr("""transform(measurements\
,x-> struct(x.Observation_ID as Observation_ID,\
             x.Observation_Result as Observation_Result))""")).printSchema()

#root
 #|-- measurements: array (nullable = true)
 #|    |-- element: struct (containsNull = false)
 #|    |    |-- Observation_ID: string (nullable = true)
 #|    |    |-- Observation_Result: string (nullable = true)
于 2020-04-29T22:56:51.337 回答
0

对于正在寻找适用于旧 pyspark 版本的答案的任何人,这里有一个使用 udfs 的答案:

import pyspark.sql.functions as f
from pyspark.sql.types import ArrayType, LongType, StringType, StructField, StructType

_measurement_type = ArrayType(StructType([
    StructField('Observation_ID', StringType(), True),
    StructField('Observation_Result', StringType(), True)
]))

@f.udf(returnType=_measurement_type)
def higher_order_select(measurements):
    return [(m.Observation_ID, m.Observation_Result) for m in measurements]

df.select(higher_order_select('measurements').alias('measurements')).printSchema()

印刷

root
 |-- measurements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Observation_ID: string (nullable = true)
 |    |    |-- Observation_Result: string (nullable = true)
于 2021-08-31T09:01:07.540 回答