0

我想将检查点测试保存在亚马逊 S3 上的某个位置,这是我在 DStream 上的 scala 代码的一部分,使用以下格式但出现错误..

线程“main”java.lang.IllegalArgumentException 中的异常:AWS 访问密钥 ID 和秘密访问密钥必须分别指定为 s3n URL 的用户名或密码,或者通过设置 fs.s3n.awsAccessKeyId 或 fs.s3n。 awsSecretAccessKey 属性(分别)。

代码:

val creatingFunc = { ()=>

// Create a StreamingContext
val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
val ggsnLines = ssc.fileStream[LongWritable, Text, TextInputFormat]("C:\\Users\\Mbazarganigilani\\Documents\\RA\\GGSN\\Files1",filterF,false)
val ccnLines= ssc.fileStream[LongWritable, Text, TextInputFormat]("C:\\Users\\Mbazarganigilani\\Documents\\RA\\CCN\\Files1",filterF,false)
val probeLines= ssc.fileStream[LongWritable, Text, TextInputFormat]("C:\\Users\\Mbazarganigilani\\Documents\\RA\\Probe\\Files1",filterF,false)

val ggssnArrays=ggsnLines.map(x=>(x._1,x._2.toString())).filter(!_._2.contains("ggsnIPAddress")).map(x=>(x._1,x._2.split(",")))
ggssnArrays.foreachRDD(s=> {
  s.collect().take(10).foreach(u=>println(u._2.mkString(",")))
})

ssc.remember(Minutes(1))  // To make sure data is not deleted by the time we query it interactively


ssc.checkpoint("s3n://probecheckpoints/checkpoints")

println("Creating function called to create new StreamingContext")
newContextCreated = true
ssc
}

def main(args:Array[String]): Unit =
{

//the minremeberduration is set to read the previous files from the directory
//the kyroclasses serialization needs to be enabled for the filestream

if (stopActiveContext) {
  StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
}

// Get or create a streaming context

val hadoopConfiguration:Configuration=new Configuration()
hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "AKIAIOPSJVBDTEUHUJCQ")
hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "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")


//val ssc = StreamingContext.getActiveOrCreate(creatingFunc)
val ssc=StreamingContext.getActiveOrCreate("s3n://probecheckpoints/SparkCheckPoints",creatingFunc,hadoopConfiguration,false)



if (newContextCreated) {
  println("New context created from currently defined creating function")
} else {
  println("Existing context running or recovered from checkpoint, may not be running currently defined creating function")
}

// Start the streaming context in the background.
ssc.start()
4

0 回答 0