1

我有一个 PySpark 数据框 df:

+---------+------------------+
|ceil_temp|             test2|
+---------+------------------+
|       -1|[6397024, 6425417]|
|        0|[6397024, 6425417]|
|        0|[6397024, 6425417]|
|        0|[6469640, 6531963]|
|        0|[6469640, 6531963]|
|        1|[6469640, 6531963]|
+---------+------------------+

我最终想根据 ceil_temp 列的索引向这个数据框添加一个新列(最终),其值是 test2 列中的列表元素。例如:如果 ceil_temp 列中的值 <0 或 0,则最终列的元素位于 test2 列的第 0 个索引中。类似这样的内容:

+---------+------------------+--------
|ceil_temp|             test2|final  |
+---------+------------------+--------
|       -1|[6397024, 6425417]|6397024|
|        0|[6397024, 6425417]|6397024|
|        0|[6397024, 6425417]|6397024|
|        0|[6469640, 6531963]|6469640|
|        0|[6469640, 6531963]|6469640|
|        1|[6469640, 6531963]|6531963|
+---------+------------------+--------

为此,我尝试使用 flatMap 将 ceil_temp 和 test2 提取为列表:

m =df.select("ceil_temp").rdd.flatMap(lambda x: x).collect()
q= df.select("test2").rdd.flatMap(lambda x: x).collect()

l=[]
for i in range(len(num)):
    if m[i]<0:
        m[i]=0
    else:
        pass
    l.append(q[i][m[i]])

然后将此列表 l 转换为新的 df 并将其与基于我基于窗口函数添加的行索引列的原始数据框连接起来:

w = Window().orderBy()
df=df.withColumn("columnindex", rowNumber().over(w)).

但是,flatMap 提取的列表的顺序似乎与父数据框 df 的顺序不同。我得到以下信息:

m=[-1,0,0,0,0,1]
q=[[6469640, 6531963],[6469640, 6531963],[6469640, 6531963],[6397024, 6425417],[6397024, 6425417],[6397024, 6425417]]

预期结果:

m=[-1,0,0,0,0,1]
q=[[6397024, 6425417],[6397024, 6425417],[6397024, 6425417],[6469640, 6531963],[6469640, 6531963],[6469640, 6531963]]

请告知如何实现“最终”列。

4

2 回答 2

0

我认为您可以在数据框的行上使用 UDF 来实现您想要的结果。

然后,您可以withColumn使用 udf 的结果。

val df = spark.sparkContext.parallelize(List(
  (-1, List(6397024, 6425417)),
  (0,List(6397024, 6425417)),
  (0,List(6397024, 6425417)),
  (0,List(6469640, 6531963)),
  (0,List(6469640, 6531963)),
  (1,List(6469640, 6531963)))).toDF("ceil_temp", "test2")

import org.apache.spark.sql.functions.udf
val selectRightElement = udf {
  (ceilTemp: Int, test2: Seq[Int]) => {
    // dummy code for the example
    if (ceilTemp <= 0) test2(0) else test2(1)
  }
}

df.withColumn("final", selectRightElement(df("ceil_temp"), df("test2"))).show

这样做可以防止打乱您的行顺序。

于 2017-08-10T18:50:15.637 回答
0

我通过以下方式解决了上述问题:

df=df.withColumn("final",(df.test2).getItem(df.ceil_temp))
于 2017-08-22T16:49:21.763 回答