0

输入_pyspark_dataframe:

id   name  collection  student.1.price  student.2.price  student.3.price
111  aaa      1           100              999               232
222  bbb      2           200              888               656
333  ccc      1           300              777               454
444  ddd      1           400              666               787

output_pyspark_dataframe

id   name  collection    price  
111  aaa      1           100           
222  bbb      2           888            
333  ccc      1           300             
444  ddd      3           787       

我们可以通过使用集合列中的值找到每个 id 的正确价格

问题

使用 pyspark,我如何通过动态框架列名找到每个 id 的正确价格student.{collection}.price

请告诉我。

4

1 回答 1

0

有点完整,但你可以这样做。

将为您提供结构字段的fields字段名称,student. 您应该手动提供并最终获得1, 2, 3.

第一行然后为 的列创建一个student.{i}.price数组i = range(1, 4)。同样,第二行创建了一个文字数组{i}

现在,将这两个数组压缩成一个数组,例如

[('1', col('student.1.price')), ...]

并爆炸数组然后它变成:

('1', col('student.1.price'))
('2', col('student.2.price'))
('3', col('student.3.price'))

由于arrays_zip给你一个结构数组,上面的结果是结构类型。使用 struct key 作为列获取每个值,即indexand price

最后可以比较collectionand index(这其实是student struct列的字段名)。

import pyspark.sql.functions as f

fields = [field.name for field in next(field for field in df.schema.fields if field.name == 'student').dataType.fields]

df.withColumn('array', f.array(*map(lambda x: 'student.' + x + '.price', fields))) \
  .withColumn('index', f.array(*map(lambda x: f.lit(x), fields))) \
  .withColumn('zip', f.arrays_zip('index', 'array')) \
  .withColumn('zip', f.explode('zip')) \
  .withColumn('index', f.col('zip.index')) \
  .withColumn('price', f.col('zip.array')) \
  .filter('collection = index') \
  .select('id', 'name', 'collection', 'price') \
  .show(10, False)

+---+----+----------+-----+
|id |name|collection|price|
+---+----+----------+-----+
|111|aaa |1         |100  |
|222|bbb |2         |888  |
|333|ccc |1         |300  |
|444|ddd |3         |787  |
+---+----+----------+-----+
于 2020-08-20T09:57:38.343 回答