0

我有一个带日期的数据框,想过滤过去 3 天(不是基于当前时间,而是数据集中可用的最新时间)

+---+----------------------------------------------------------------------------------+----------+
|id |partition                                                                         |date      |
+---+----------------------------------------------------------------------------------+----------+
|1  |/raw/gsec/qradar/flows/dt=2019-12-01/hour=00/1585218406613_flows_20191201_00.jsonl|2019-12-01|
|2  |/raw/gsec/qradar/flows/dt=2019-11-30/hour=00/1585218406613_flows_20191201_00.jsonl|2019-11-30|
|3  |/raw/gsec/qradar/flows/dt=2019-11-29/hour=00/1585218406613_flows_20191201_00.jsonl|2019-11-29|
|4  |/raw/gsec/qradar/flows/dt=2019-11-28/hour=00/1585218406613_flows_20191201_00.jsonl|2019-11-28|
|5  |/raw/gsec/qradar/flows/dt=2019-11-27/hour=00/1585218406613_flows_20191201_00.jsonl|2019-11-27|
+---+----------------------------------------------------------------------------------+----------+

应该返回

+---+----------------------------------------------------------------------------------+----------+
|id |partition                                                                         |date      |
+---+----------------------------------------------------------------------------------+----------+
|1  |/raw/gsec/qradar/flows/dt=2019-12-01/hour=00/1585218406613_flows_20191201_00.jsonl|2019-12-01|
|2  |/raw/gsec/qradar/flows/dt=2019-11-30/hour=00/1585218406613_flows_20191201_00.jsonl|2019-11-30|
|3  |/raw/gsec/qradar/flows/dt=2019-11-29/hour=00/1585218406613_flows_20191201_00.jsonl|2019-11-29|
+---+----------------------------------------------------------------------------------+----------+

编辑:我采用@Lamanus 回答从分区字符串中提取日期

df = sqlContext.createDataFrame([
    (1, '/raw/gsec/qradar/flows/dt=2019-12-01/hour=00/1585218406613_flows_20191201_00.jsonl'),
    (2, '/raw/gsec/qradar/flows/dt=2019-11-30/hour=00/1585218406613_flows_20191201_00.jsonl'),
    (3, '/raw/gsec/qradar/flows/dt=2019-11-29/hour=00/1585218406613_flows_20191201_00.jsonl'),
    (4, '/raw/gsec/qradar/flows/dt=2019-11-28/hour=00/1585218406613_flows_20191201_00.jsonl'),
    (5, '/raw/gsec/qradar/flows/dt=2019-11-27/hour=00/1585218406613_flows_20191201_00.jsonl')
], ['id','partition'])

df.withColumn('date', F.regexp_extract('partition', '[0-9]{4}-[0-9]{2}-[0-9]{2}', 0)) \
  .show(10, False)
4

1 回答 1

1

对于您最初的目的,我认为您不需要特定日期的文件夹。因为文件夹结构已经被分区了dt,所以把它们都拿来做过滤。

df = spark.createDataFrame([('1', '/raw/gsec/qradar/flows/dt=2019-12-01/hour=00/1585218406613_flows_20191201_00.jsonl')]).toDF('id', 'value')

from pyspark.sql.functions import *

dates = df.withColumn('date', regexp_extract('value', '[0-9]{4}-[0-9]{2}-[0-9]{2}', 0)) \
  .withColumn('date', explode(sequence(to_date('date'), date_sub('date', 2)))) \
  .select('date').rdd.map(lambda x: str(x[0])).collect()

path = df.withColumn('value', split('value', '/dt')[0]) \
  .select('value').rdd.map(lambda x: str(x[0])).collect()

newDF = spark.read.json(path).filter(col(dt).isin(dates))

这是我的尝试。

df = spark.createDataFrame([('1', '/raw/gsec/qradar/flows/dt=2019-12-01/hour=00/1585218406613_flows_20191201_00.jsonl')]).toDF('id', 'value')

from pyspark.sql.functions import *

df.withColumn('date', regexp_extract('value', '[0-9]{4}-[0-9]{2}-[0-9]{2}', 0)) \
  .withColumn('date', explode(sequence(to_date('date'), date_sub('date', 2)))) \
  .withColumn('value', concat(lit('.*/'), col('date'), lit('/.*'))).show(10, False)

+---+----------------+----------+
|id |value           |date      |
+---+----------------+----------+
|1  |.*/2019-12-01/.*|2019-12-01|
|1  |.*/2019-11-30/.*|2019-11-30|
|1  |.*/2019-11-29/.*|2019-11-29|
+---+----------------+----------+
于 2020-08-25T13:57:07.737 回答