1

我正在使用带有 kinesis 的火花流,并在运行代码时遇到此异常

这是我的代码

System.setProperty("AWS_ACCESS_KEY_ID", KinesisProperties.AWS_ACCESS_KEY_ID)
System.setProperty("AWS_SECRET_KEY", KinesisProperties.AWS_SECRET_KEY)
var kinesisClient: AmazonKinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())

kinesisClient.setEndpoint(KinesisProperties.KINESIS_ENDPOINT_URL,
            KinesisProperties.KINESIS_SERVICE_NAME,
            KinesisProperties.KINESIS_REGION_ID)
val numShards = kinesisClient.describeStream(  KinesisProperties.MY_STREAM_NAME)
    .getStreamDescription().getShards().size()
val numStreams = numShards
val ssc = StreamingHelper.getStreamingInstance(new Duration(KinesisProperties.KINESIS_CHECKPOINT_INTERVAL))
ssc.addStreamingListener(new MyStreamListener)
val kinesisStreams = (0 until numStreams).map { i =>
            KinesisUtils.createStream(ssc, KinesisProperties.MY_STREAM_NAME,
                KinesisProperties.KINESIS_ENDPOINT_URL,
                new Duration(KinesisProperties.KINESIS_CHECKPOINT_INTERVAL), InitialPositionInStream.TRIM_HORIZON,
                null)
        }
        /* Union all the streams */
        val unionStreams = ssc.union(kinesisStreams)
        val tmp_stream = unionStreams.map(byteArray => new String(byteArray))
        val data = tmp_stream.window(Seconds(KinesisProperties.WINDOW_INTERVAL), Seconds(KinesisProperties.SLIDING_INTERVAL))
        data.foreachRDD((rdd: RDD[String], time: Time) => {
            if (rdd.take(1).size == 1) {
                rdd.saveAsTextFile(KinesisProperties.Sink + time.milliseconds)
            }
        })
        ssc.start()
        ssc.awaitTermination()

并面临以下异常

java.lang.LinkageError: loader (instance of  org/apache/spark/executor/ChildExecutorURLClassLoader$userClassLoader$): attempted  duplicate class definition for name: "com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream"
4

0 回答 0