0

我正在尝试从文件中提取数据一个月,然后对其进行处理。基本上我需要为每个月提取数据并进行一些转换。由于我的工作每天都在运行,我想利用它并填充该月的数据,直到 run_date。

我有两种方法:

方法一:

仅填充上个月的数据。例如,如果我的 current_date 或 run_date 在 月May,我将填充 月的数据April。这可以通过从中提取月份current_date()并从中减去1来实现。类似于以下内容:

df.filter(month(to_date(col("startDate")))===month(to_date(current_date())-1))

这只是一个想法。这段代码无法实现我想要做的事情,因为我只是减去月份部分而不考虑Year 部分。

但在这种情况下,我的工作将每天运行以填充整个月的相同数据。这样做没有意义。

方法二:

如果我的 current_date 是2020-05-27,我想从中提取数据2020-05-01 to 2020-05-26。如果我当前的日期是2020-06-01,它应该填充来自 5 月份的数据2020-05-01 to 2020-05-31

我想实施方法 2。我能想到的唯一想法是写几个Case语句来检查日期并相应地填充它。

有人可以分享一些想法。有没有稍微直截了当的方法。

我在用Spark 1.5

4

1 回答 1

1

检查这是否有帮助-

1.加载测试数据

val data =
      """
        |2018-04-07 07:07:17
        |2018-04-07 07:32:27
        |2018-04-07 08:36:44
        |2018-04-07 08:38:00
        |2018-04-07 08:39:29
        |2018-04-08 01:43:08
        |2018-04-08 01:43:55
        |2018-04-09 07:52:31
        |2018-04-09 07:52:42
        |2019-01-24 11:52:31
        |2019-01-24 12:52:42
        |2019-01-25 12:52:42
      """.stripMargin
    val df = spark.read
      .schema(StructType(Array(StructField("startDate", DataTypes.TimestampType))))
      .csv(data.split(System.lineSeparator()).toSeq.toDS())
    df.show(false)
    df.printSchema()

输出-


+-------------------+
|startDate          |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
|2018-04-09 07:52:31|
|2018-04-09 07:52:42|
|2019-01-24 11:52:31|
|2019-01-24 12:52:42|
|2019-01-25 12:52:42|
+-------------------+

root
 |-- startDate: timestamp (nullable = true)

2.创建过滤列基于current date

    val filterCOl = (currentDate: String) =>  when(datediff(date_format(lit(currentDate), "yyyy-MM-dd")
      ,date_format(lit(currentDate), "yyyy-MM-01"))===lit(0),
     date_format(col("startDate"), "yyyy-MM") ===
       date_format(concat_ws("-",year(lit(currentDate)), month(lit(currentDate)) -1), "yyyy-MM")
    ).otherwise(to_date(col("startDate"))
     .between(date_format(lit(currentDate), "yyyy-MM-01"), lit(currentDate)))

3.检查当前数据在月份之间的时间

 var currentDateStr = "2018-04-08"
    df.filter(filterCOl(currentDateStr)).show(false)

输出-

+-------------------+
|startDate          |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
+-------------------+

4.检查当前数据是什么时候是一个月的第一天

currentDateStr = "2018-05-01"
    df.filter(filterCOl(currentDateStr)).show(false)

输出-

+-------------------+
|startDate          |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
|2018-04-09 07:52:31|
|2018-04-09 07:52:42|
+-------------------+

于 2020-05-27T15:46:49.423 回答