1

我有一个清单lists=[0,1,2,3,5,6,7]。顺序不是顺序的。我有一个 9 列的 pyspark 数据框。

+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+
|               date|ftt (°c)|rtt (°c)|fbt (°c)|rbt (°c)|fmt (°c)|rmt (°c)|fmhhumidityunit|index|Diff|
+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+
|2019-02-01 05:29:47|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|            NaN|    0| NaN|
|2019-02-01 05:29:17|     NaN|     NaN|     NaN|     NaN|     NaN|    NaN|           NaN|    1| NaN |

我需要将我的列表作为列添加到我现有的数据框中。我的列表不按顺序排列,所以我无法使用 udf。有没有办法做到这一点?请帮助我,我希望它是这样的

+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+------+
|               date|ftt (°c)|rtt (°c)|fbt (°c)|rbt (°c)|fmt (°c)|rmt (°c)|fmhhumidityunit|index|Diff|lists |
+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+-------+
|2019-02-01 05:29:47|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|            NaN|    0| NaN|0     |
|2019-02-01 05:29:17|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|           NaN|    1| NaN |1     |
4

2 回答 2

1

不太确定它是否必须是这样的,或者你是否期待别的东西。如果您的列表项和数据框行数必须相同,那么这是一种简单的方法。

对于具有三列的给定示例数据框:

 l = [(1,'DEF',33),(2,'KLM',22),(3,'ABC',32),(4,'XYZ',77)]
 df=spark.createDataFrame(l, ['id', 'value','age'])

让我们说这是一个列表:

lists=[5,6,7,8]

可以从此列表中创建一个 rdd 并使用带有数据框的 zip 函数并在其上使用 map 函数。

listrdd = sc.parallelize(lists)

newdf=df.rdd.zip(listrdd).map(lambda (x,y ) : ([x for x in x] + [y])).toDF(["id", "Value",",age","List_element"])

>>> ziprdd=df.rdd.zip(listrdd)
>>> ziprdd.take(50)
[(Row(id=1, value=u'DEF', age=33), 5), (Row(id=2, value=u'KLM', age=22), 6), (Row(id=3, value=u'ABC', age=32), 7), (Row(id=4, value=u'XYZ', age=77), 8)]

作为 zip 函数返回键值对,第一个元素包含来自第一个 rdd 的数据,第二个元素包含来自第二个 rdd 的数据。我正在对第一个元素使用列表理解并将其与第二个元素连接起来。

它是动态的,可以用于 n 列,但列表元素和数据框行必须相同。

>>> newdf.show()
]+---+-----+----+------------+
| id|Value|,age|List_element|
+---+-----+----+------------+
|  1|  DEF|  33|           5|
|  2|  KLM|  22|           6|
|  3|  ABC|  32|           7|
|  4|  XYZ|  77|           8|
+---+-----+----+------------+

注意:使用 zip 方法时,两个 rdd 分区计数必须相同,否则会出现错误

ValueError: Can only zip with RDD which has the same number of partitions
于 2019-10-03T19:36:14.590 回答
0

你可以join两个dfs,像这样:

df2 = spark.createDataFrame()
df= df.join(df2, on=['index']).drop('index')

df2 将包含您希望添加到主 df 的列。

于 2019-10-02T16:00:16.287 回答