0

I have log files going into different directories based on the date created of the log file.

For example

> /mypath/2017/01/20/... 
.
.
.
> /mypath/2017/02/13/...
> /mypath/2017/02/14/...

I would like to combine all these log files into one single rdd using pyspark so that I can do the aggregates on this master file.

Till date, I have taken individual directories, called sqlContext and used Union to join all the log file for specific dates.

DF1 = (sqlContext.read.schema(schema).json("/mypath/2017/02/13")).union(sqlContext.read.schema(schema).json("/mypath/2017/02/14"))

Is there an easy way to get the master rdd by specifying the log files from range of dates? (i.e from 2017/01/20 to 2017/02/14)

I am quite new to spark, please correct me if I was wrong at any step.

4

1 回答 1

1

如果您坚持使用 sqlContext,那么一个简单的解决方案是定义一个方法,该方法将列出输入目录中的所有文件

case class FileWithDate(basePath: String, year: Int, month: Int, day: Int) {
 def path = s"${basePath}/${year}/${month}/${day}"
}

def listFileSources() : List[FileWithDate] = ??? // implement here

如果要合并来自源的所有数据框,可以这样做:

// create an empty dataframe with the strucutre for the json
val files = listSources()
val allDFs = files.foldLeft(emptyDF){case (df, f) => df.union(sqlContext.read.schema(schema).json(f.path))}

如果您想按日期过滤输入文件,那将很容易。像这样的东西

files.filter(_.year == 2016 && (_.month >=2 || _.month <=3))

另一个解决方案是用年、月、日来扩充你的数据框(放置额外的列),并在新的数据框上执行所有业务逻辑

于 2017-02-14T13:00:00.660 回答