我有 2 个类在 Apache Spark 中执行类似的任务,但是使用数据帧的类比使用 RDD 的“常规”类慢很多倍。(30 倍)
我想使用数据框,因为它会消除我们拥有的很多代码和类,但显然我不能让它慢得多。
数据集没什么大不了的。我们有 30 个文件,每个文件都包含 json 数据,这些文件是关于另一个软件中的活动触发的事件。每个文件中有 0 到 100 个事件。
包含 82 个事件的数据集将需要大约 5 分钟来处理数据帧。
示例代码:
public static void main(String[] args) throws ParseException, IOException {
SparkConf sc = new SparkConf().setAppName("POC");
JavaSparkContext jsc = new JavaSparkContext(sc);
SQLContext sqlContext = new SQLContext(jsc);
conf = new ConfImpl();
HashSet<String> siteSet = new HashSet<>();
// last month
Date yesterday = monthDate(DateUtils.addDays(new Date(), -1)); // method that returns the date on the first of the month
Date startTime = startofYear(new Date(yesterday.getTime())); // method that returns the date on the first of the year
// list all the sites with a metric file
JavaPairRDD<String, String> allMetricFiles = jsc.wholeTextFiles("hdfs:///somePath/*/poc.json");
for ( Tuple2<String, String> each : allMetricFiles.toArray() ) {
logger.info("Reading from " + each._1);
DataFrame metric = sqlContext.read().format("json").load(each._1).cache();
metric.count();
boolean siteNameDisplayed = false;
boolean dateDisplayed = false;
do {
Date endTime = DateUtils.addMonths(startTime, 1);
HashSet<Row> totalUsersForThisMonth = new HashSet<>();
for (String dataPoint : Conf.DataPoints) { // This is a String[] with 4 elements for this specific case
try {
if (siteNameDisplayed == false) {
String siteName = parseSiteFromPath(each._1); // method returning a parsed String
logger.info("Data for site: " + siteName);
siteSet.add(siteName);
siteNameDisplayed = true;
}
if ( dateDisplayed == false ) {
logger.info("Month: " + formatDate(startTime)); // SimpleFormatDate("yyyy-MM-dd")
dateDisplayed = true;
}
DataFrame lastMonth = metric.filter("event.eventId=\"" + dataPoint + "\"").filter("creationDate >= " + startTime.getTime()).filter("creationDate < " + endTime.getTime()).select("event.data.UserId").distinct();
logger.info("Distinct for last month for " + dataPoint + ": " + lastMonth.count());
totalUsersForThisMonth.addAll(lastMonth.collectAsList());
} catch (Exception e) {
// data does not fit the expected model so there is nothing to print
}
}
logger.info("Total Unique for the month: " + totalStudentForThisMonth.size());
startTime = DateUtils.addMonths(startTime, 1);
dateDisplayed = false;
} while ( startTime.getTime() < commonTmsMetric.monthDate(yesterday).getTime());
// reset startTime for the next site
startTime = commonTmsMetric.StartofYear(new Date(yesterday.getTime()));
}
}
此代码中有一些效率不高的地方,但是当我查看日志时,它只会为整个处理增加几秒钟的时间。
我一定错过了一些大事。
我已经用 2 个执行器和 1 个执行器运行了这个,差别是 5 分钟 20 秒。
这是在 Hadoop 2.5.0 上使用 Java 1.7 和 Spark 1.4.1 运行的。
谢谢!