在 scala 中使用 kinesis 应用程序运行简单的香草火花流时,我遇到了一些问题。我遵循了一些教程中的基本指导,例如Snowplow和WordCountASL。
然而,由于这个 Kinesis Worker 错误,我仍然无法使其工作:
16/11/15 09:00:27 ERROR Worker: Caught exception when initializing LeaseCoordinator
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:125)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:374)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:318)
at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:174)
Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:1758)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:822)
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118)
... 4 more
这是我的代码示例:
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.internal.StaticCredentialsProvider
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
/**
* Created by franco on 11/11/16.
*/
object TestApp {
// === Configurations for Kinesis streams ===
val awsAccessKeyId = "XXXXXX"
val awsSecretKey = "XXXXXXX"
val kinesisStreamName = "MyStream"
val kinesisEndpointUrl = "https://kinesis.region.amazonaws.com" //example "https://kinesis.us-west-2.amazonaws.com"
val appName = "MyAppName"
def main(args: Array[String]): Unit = {
val credentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey)
val provider = new StaticCredentialsProvider(credentials)
val kinesisClient = new AmazonKinesisClient(provider)
kinesisClient.setEndpoint(kinesisEndpointUrl)
val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size()
val streams = shards
val batchInterval = Milliseconds(2000)
val kinesisCheckpointInterval = batchInterval
val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName
val cores : Int = Runtime.getRuntime.availableProcessors()
println("Available Cores : " + cores.toString)
val config = new SparkConf().setAppName("MyAppName").setMaster("local[" + (cores / 2 ) + "]" )
val ssc = new StreamingContext(config, batchInterval)
// Create the Kinesis DStreams
val kinesisStreams = (0 until streams).map { i =>
KinesisUtils.createStream(ssc, appName, kinesisStreamName, kinesisEndpointUrl, regionName,
InitialPositionInStream.LATEST, kinesisCheckpointInterval * 2, StorageLevel.MEMORY_AND_DISK_2)
}
ssc.union(kinesisStreams).map(bytes => new String(bytes)).print()
// Start the streaming context and await termination
ssc.start()
ssc.awaitTermination()
}
}
我的 IAM 政策如下所示:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt123",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords"
],
"Resource": [
"arn:aws:kinesis:region:account:stream/name"
]
},
{
"Sid": "Stmt456",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DeleteItem",
"dynamodb:DescribeTable",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:Scan",
"dynamodb:UpdateItem"
],
"Resource": [
"arn:aws:dynamodb:region:account:table/name"
]
},
{
"Sid": "Stmt789",
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": [
"*"
]
}
]
}
我无法理解这个应用程序有什么问题。任何有关此主题的指导将不胜感激。