1

我有一个像这样的 PySpark 数据框:

+--------+-------------+--------------+-----------------------+
|material|purchase_date|mkt_prc_usd_lb|min_mkt_prc_over_1month|
+--------+-------------+--------------+-----------------------+
|  Copper|   2019-01-09|        2.6945|                 2.6838|
|  Copper|   2019-01-23|        2.6838|                 2.6838|
|    Zinc|   2019-01-23|        1.1829|                 1.1829|
|    Zinc|   2019-06-26|        1.1918|                 1.1918|
|Aluminum|   2019-01-02|        0.8363|                 0.8342|
|Aluminum|   2019-01-09|        0.8342|                 0.8342|
|Aluminum|   2019-01-23|        0.8555|                 0.8342|
|Aluminum|   2019-04-03|        0.8461|                 0.8461|
+--------+-------------+--------------+-----------------------+

最后一列“min_mkt_prc_over_1month”计算为材料一个月内的最小“mkt_prc_usd_lb”(第 3 列),即材料、purchase_date 窗口的(-15 天到 +15 天):

代码是:


w2 = (Window()
           .partitionBy("material")
           .orderBy(col("purchase_date").cast("timestamp").cast("long"))
           .rangeBetween(-days(15), days(15)))

现在,我想看看当金额是/将是最低时的“purchase_date”是多少?

预期输出:(来自前两行)

+--------+-------------+--------------+-----------------------+------------------+
|material|purchase_date|mkt_prc_usd_lb|min_mkt_prc_over_1month|date_of_min_price |
+--------+-------------+--------------+-----------------------+------------------+
|  Copper|   2019-01-09|        2.6945|                 2.6838|        2019-01-23|
|  Copper|   2019-01-23|        2.6838|                 2.6838|        2019-01-23|
+--------+-------------+--------------+-----------------------+------------------+
4

1 回答 1

3

尝试这个。我们可以在 , 的位置创建一个列two prc are the same to populate it with purchase dateotherwise to put Null然后我们可以First with ignoreNulls=True在我们的newly created column using our window w2..

from pyspark.sql.functions import *
from pyspark.sql.window import Window

days= lambda i: i * 86400
w2 = (Window()
           .partitionBy("material")
           .orderBy(col("purchase_date").cast("timestamp").cast("long"))
           .rangeBetween(-days(15), days(15)))


df.withColumn("first",\
              expr("""IF(mkt_prc_usd_lb=min_mkt_prc_over_1month,purchase_date,null)"""))\
  .withColumn("date_of_min_price", first("first", True).over(w2)).drop("first")\
  .show()

#+--------+-------------+--------------+-----------------------+-----------------+
#|material|purchase_date|mkt_prc_usd_lb|min_mkt_prc_over_1month|date_of_min_price|
#+--------+-------------+--------------+-----------------------+-----------------+
#|  Copper|   2019-01-09|        2.6945|                 2.6838|       2019-01-23|
#|  Copper|   2019-01-23|        2.6838|                 2.6838|       2019-01-23|
#|    Zinc|   2019-01-23|        1.1829|                 1.1829|       2019-01-23|
#|    Zinc|   2019-06-26|        1.1918|                 1.1918|       2019-06-26|
#|Aluminum|   2019-01-02|        0.8363|                 0.8342|       2019-01-09|
#|Aluminum|   2019-01-09|        0.8342|                 0.8342|       2019-01-09|
#|Aluminum|   2019-01-23|        0.8555|                 0.8342|       2019-01-09|
#|Aluminum|   2019-04-03|        0.8461|                 0.8461|       2019-04-03|
#+--------+-------------+--------------+-----------------------+-----------------+
于 2020-05-20T01:27:26.840 回答