我有一个返回一个DataFrame
和两个累加器的函数。手动运行spark-shell
(从 jar 调用函数)它按预期工作。做 a填充累加器.count
。DataFrame
但是如果我从累加器调用函数spark-submit
总是空的。我曾尝试DataFrame
以相同的奇怪行为返回两个 :在 中工作spark-shell
,而不是从spark-submit
.
这是我的代码的一个可能不起作用的框架:
import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext, Row}
import scala.collection._
...
def process(
sc:SparkContext,
sqlContext:SQLContext,
filepaths : RDD[String]
): (
val logMetadataAccumulator = sc.accumulableCollection(mutable.ArrayBuffer[( String, String, Long, String, Long, Long)]())
val logFailedRowAccumulator = sc.accumulableCollection(mutable.ArrayBuffer[( String)]())
...
...
val logRecordsPre = logRawFlow.map(
entry => {
val date = """(\d\d\d\d)-(\d\d)-(\d\d)""".r
if ( fields.length == 23 && date.findFirstMatchIn(fields(2)).nonEmpty && fields(22).forall(_.isDigit) && fields(21).forall(_.isDigit) ) {
...
Row( 1L, "blah" "blah", 0L )
}
else ( fields(0) == logMetaDataPrefix ) {
...
logMetadataAccumulator += (fileName, logType, numLines, logSource, startTime, endTime)
Row( 0L, "blah" "blah", 0L )
}
else {
try {
val fileName = fields(0)
logFailedRowAccumulator += (fileName)
Row( 0L, "blah" "blah", 0L )
}
catch {
case e: Exception => {
logFailedRowAccumulator += ("unknown")
Row( 0L, "blah" "blah", 0L )
}
}
}
}
)
val logRecords = logRecordsPre.filter( _.getLong(0) != 0L)
val logDF = sqlContext.createDataFrame(logRecords, logSchema)
( logDF, logMetadataAccumulator, logFailedRowAccumulator )
)