-2

我有一个返回一个DataFrame和两个累加器的函数。手动运行spark-shell(从 jar 调用函数)它按预期工作。做 a填充累加器.countDataFrame

但是如果我从累加器调用函数spark-submit总是空的。我曾尝试DataFrame以相同的奇怪行为返回两个 :在 中工作spark-shell,而不是从spark-submit.

这是我的代码的一个可能不起作用的框架:

import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext, Row}
import scala.collection._
...

def process(
    sc:SparkContext,
    sqlContext:SQLContext,
    filepaths : RDD[String]
    ): ( 

    val logMetadataAccumulator = sc.accumulableCollection(mutable.ArrayBuffer[( String, String, Long, String, Long, Long)]())
    val logFailedRowAccumulator = sc.accumulableCollection(mutable.ArrayBuffer[( String)]())
    ...
    ...
    val logRecordsPre = logRawFlow.map(
        entry => {
            val date = """(\d\d\d\d)-(\d\d)-(\d\d)""".r
            if ( fields.length == 23 && date.findFirstMatchIn(fields(2)).nonEmpty && fields(22).forall(_.isDigit) && fields(21).forall(_.isDigit) ) {
                ...
                Row( 1L, "blah" "blah", 0L )
            }
            else ( fields(0) == logMetaDataPrefix ) {
                ...
                logMetadataAccumulator += (fileName, logType, numLines, logSource, startTime, endTime)
                Row( 0L, "blah" "blah", 0L )
            }
            else {
                try { 
                    val fileName = fields(0)
                    logFailedRowAccumulator += (fileName)
                    Row( 0L, "blah" "blah", 0L )
                }
                catch {
                    case e: Exception => {
                        logFailedRowAccumulator += ("unknown")
                        Row( 0L, "blah" "blah", 0L )
                    }
                }
            }
        }
    )

    val logRecords = logRecordsPre.filter( _.getLong(0) != 0L)

    val logDF = sqlContext.createDataFrame(logRecords, logSchema)

    ( logDF, logMetadataAccumulator, logFailedRowAccumulator )
)
4

1 回答 1

0

我的错,经过仔细检查,我发现虽然.count在将累加器转换为 DataFrames 之前手动调用 shell 中的函数时,但在保存数据帧并由运行spark-submit器从累加器创建的数据帧中调用的函数中是在返回的数据框对其执行操作之前创建..

例如:

val ( df, acc1, acc2 ) = process( sc, sqlContext, filePaths )
df.count
val acc1Array = acc1.value.toArray
val acc2Array = acc2.value.toArray

不是(我在 Runner 中所做的事情:)

val ( df, acc1, acc2 ) = process( sc, sqlContext, filePaths )
val acc1Array = acc1.value.toArray
val acc2Array = acc2.value.toArray
df.count
于 2015-09-14T02:05:43.993 回答