4

在使用 pyspark 的代码仓库中,我尝试使用今天的日期,基于此我需要检索上一季度的最后一天。然后,该日期将用于过滤掉数据框中的数据。我试图在代码仓库中创建一个数据框,但这不起作用。我的代码在代码工作簿中工作。这是我的代码工作簿代码。

import datetime as dt
import pyspark.sql.functions as F


def unnamed():
    date_df = spark.createDataFrame([(dt.date.today(),)], ['date'])
    date_df = date_df \
        .withColumn('qtr_start_date', F.date_trunc('quarter', F.col('date'))) \
        .withColumn('qtr_date', F.date_sub(F.col('qtr_start_date'), 1))

    return date_df

任何帮助,将不胜感激。

4

1 回答 1

1

我得到以下代码在代码存储库中成功运行:

from transforms.api import transform_df, Input, Output
import datetime as dt
import pyspark.sql.functions as F


@transform_df(
    Output("/my/output/dataset"),
)
def my_compute_function(ctx):
    date_df = ctx.spark_session.createDataFrame([(dt.date.today(),)], ['date'])
    date_df = date_df \
        .withColumn('qtr_start_date', F.date_trunc('quarter', F.col('date'))) \
        .withColumn('qtr_date', F.date_sub(F.col('qtr_start_date'), 1))

    return date_df

您需要将ctx参数传递到您的转换中,您可以pyspark.sql.DataFrame直接使用底层spark_session变量。

如果您的输入中已经有可用的日期列,您只需要确保它是Date类型,以便F.date_trunc调用在正确的类型上工作。

于 2020-09-23T16:07:59.067 回答