问题标签 [amazon-kinesis-kpl]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2336 浏览

aws-sdk - Amazon Kinesis KPL 序列化异常

我明白了

使用以下 Java 堆栈跟踪:

怎么了?

0 投票
2 回答
1523 浏览

amazon-web-services - Kinesis lambda DynamoDB

我正在为一个用例学习 AWS 服务。在浏览完文档后,我想出了一个简单的流程。我想使用 Streams API 和 KPL 将数据摄取到 Kinesis 流中。我使用示例 putRecord 方法将数据摄取到流中。我正在将此 JSON 摄取到流中 -

摄取数据后,我会在 putRecordResult 中得到以下响应-

现在我编写了一个 Lambda 函数来获取这些数据并推送到 DynamoDB 表中。这是我的 Lambda 函数 -

不知何故,我无法在 lambda 函数执行中看到 console.logs。我在流页面中看到有 putRecord 到流中并且得到了但是不知何故我在 Lambdafunction 页面和 DynamoDB 表中什么都看不到。

我有一个用于将数据摄取到 Kinesis 中的 Java 代码的 IAM 策略,另一个用于 Lambda 函数的 lambda-kinesis-execution-role 以及 DynamoDB 将数据摄取到表中的策略。

是否有任何教程显示它是如何以正确的方式完成的?我感觉我在这个过程中遗漏了很多点,例如如何链接所有这些 IAM 策略并使它们同步,以便当数据被放入流时它由 Lambda 处理并最终在 Dynamo 中?

非常感谢任何指示和帮助。

0 投票
1 回答
1062 浏览

amazon-dynamodb - 从 Spark Streaming 中超过 1 个分片的 Kinesis 流中读取

收到以下错误:

com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can't update checkpoint - instance doesn't hold the lease for this shard

这可能是什么原因?

0 投票
0 回答
318 浏览

amazon-kinesis - 无法从 Linux 服务器将数据写入 Amazon Kinesis

我已经在 linux 服务器上部署了我的 java 作业,以使用来自各种 API 的数据并将其发布到 Kinesis。我的工作无法打开 Kinesis 流来发布数据并引发以下错误。任何建议都会有很大帮助。

31253 [main] 错误 com.est.producer.KinesisEstProducer - 写入 kinesis java.util.concurrent.ExecutionException 时出错:com.amazonaws.services.kinesis.producer.IrrecoverableError:在 com.google 调用 mkfifo 后管道未显示.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) 在 com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) 在 com.google.common.util .concurrent.AbstractFuture.get(AbstractFuture.java:116) 在 com.est.producer.KinesisEstProducer.send(KinesisEstProducer.java:78) 在 com.est.cable.kafka.agent.Agent.main(Agent.java:102 ) 原因:com.amazonaws.services.kinesis.producer.IrrecoverableError:在 com.amazonaws.services.kinesis.producer.Daemon 调用 mkfifo 后管道未显示。com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:480) 上的 fatalError(Daemon.java:502) com.amazonaws.services.kinesis.producer.Daemon.createPipesUnix(Daemon.java:401)在 com.amazonaws.services.kinesis.producer.Daemon.createPipes(Daemon.java:354) 在 com.amazonaws.services.kinesis.producer.Daemon.access$000(Daemon.java:61) 在 com.amazonaws.services。 kinesis.producer.Daemon$1.run(Daemon.java:127) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266 ) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:第745章)第745章)第745章)502) 在 com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:480) 在 com.amazonaws.services.kinesis.producer.Daemon.createPipesUnix(Daemon.java:401) 在 com.amazonaws.services .kinesis.producer.Daemon.createPipes(Daemon.java:354) 在 com.amazonaws.services.kinesis.producer.Daemon.access$000(Daemon.java:61) 在 com.amazonaws.services.kinesis.producer.Daemon$1 .run(Daemon.java:127) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util。 concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745)502) 在 com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:480) 在 com.amazonaws.services.kinesis.producer.Daemon.createPipesUnix(Daemon.java:401) 在 com.amazonaws.services .kinesis.producer.Daemon.createPipes(Daemon.java:354) 在 com.amazonaws.services.kinesis.producer.Daemon.access$000(Daemon.java:61) 在 com.amazonaws.services.kinesis.producer.Daemon$1 .run(Daemon.java:127) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util。 concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745)amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:480) 在 com.amazonaws.services.kinesis.producer.Daemon.createPipesUnix(Daemon.java:401) 在 com.amazonaws.services.kinesis.producer。 Daemon.createPipes(Daemon.java:354) 在 com.amazonaws.services.kinesis.producer.Daemon.access$000(Daemon.java:61) 在 com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon. java:127) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745)amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:480) 在 com.amazonaws.services.kinesis.producer.Daemon.createPipesUnix(Daemon.java:401) 在 com.amazonaws.services.kinesis.producer。 Daemon.createPipes(Daemon.java:354) 在 com.amazonaws.services.kinesis.producer.Daemon.access$000(Daemon.java:61) 在 com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon. java:127) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745)com.amazonaws.services.kinesis.producer.Daemon.createPipesUnix(Daemon.java:401) 上的 fatalError(Daemon.java:480) com.amazonaws.services.kinesis.producer.Daemon.createPipes(Daemon.java:354)在 com.amazonaws.services.kinesis.producer.Daemon.access$000(Daemon.java:61) 在 com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:127) 在 java.util.concurrent .Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java. util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745)com.amazonaws.services.kinesis.producer.Daemon.createPipesUnix(Daemon.java:401) 上的 fatalError(Daemon.java:480) com.amazonaws.services.kinesis.producer.Daemon.createPipes(Daemon.java:354)在 com.amazonaws.services.kinesis.producer.Daemon.access$000(Daemon.java:61) 在 com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:127) 在 java.util.concurrent .Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java. util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745)createPipesUnix(Daemon.java:401) at com.amazonaws.services.kinesis.producer.Daemon.createPipes(Daemon.java:354) at com.amazonaws.services.kinesis.producer.Daemon.access$000(Daemon.java:61 ) at com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:127) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask .run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang。线程运行(线程.java:745)createPipesUnix(Daemon.java:401) at com.amazonaws.services.kinesis.producer.Daemon.createPipes(Daemon.java:354) at com.amazonaws.services.kinesis.producer.Daemon.access$000(Daemon.java:61 ) at com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:127) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask .run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang。线程运行(线程.java:745)在 com.amazonaws.services.kinesis.producer.Daemon 访问 $000(Daemon.java:61)$1.run(Daemon.java:127) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java :617) 在 java.lang.Thread.run(Thread.java:745)在 com.amazonaws.services.kinesis.producer.Daemon 访问 $000(Daemon.java:61)$1.run(Daemon.java:127) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java :617) 在 java.lang.Thread.run(Thread.java:745)第1142章第1142章

0 投票
2 回答
5528 浏览

amazon-kinesis - 通过设置 API 网关为 Amazon Kinesis 调用 REST API

我正在尝试发送 HTTP Post 请求以将记录放入 Amazon Kinesis Stream。有多种方法(Kinesis 客户端、KPL、将 AWS 网关设置为 Kinesis 代理)。

我看到了有关 Kinesis PutRecord API 的文档 http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html

是否可以将上述 HTTP POST 请求发送到 PutRecord,而无需按照以下链接中的说明设置 Amazon API Gateway:http://docs.aws.amazon.com/apigateway/latest/developerguide/use-custom-authorizer 。 html#call-api-with-api-gateway-custom-authorization

KPL 和 Kinesis Client 必须以某种方式在内部使用 HTTP POST 到 PutRecord,因此必须有办法这样做。不幸的是,我在网上找不到任何资源。

0 投票
1 回答
305 浏览

spark-streaming - Unable to read data from spark streaming connecting Kinesis

I have written below code to connect to kinesis from spark streaming but there is no data been received.

val kinesisStream = KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, batchInterval , StorageLevel.MEMORY_AND_DISK_2)

I tried to write into S3 and to file system, it writes file name by folder and in side that I see only _SUCCESS file which is of zero byte.

by the way, I can able to write to same kinesis stream and read data from java

what is the issue here.

0 投票
1 回答
1852 浏览

amazon-web-services - Kinesis Firehose 将 csv 数据加载到 Redshift

我正在使用 Kinesis firehose 将数据处理为红移,并且我正在尝试 Json 和 Csv 格式。

Json 格式对我来说很好,数据正在加载到红移表中。

用于 JSON 的复制命令:

复制 products_json FROM 's3://foldername/' CREDENTIALS 'aws_iam_role=arn:aws:iam:::role/' MANIFEST json 'auto';

放置记录的 CLI 命令:

aws firehose put-record --delivery-stream-name csvtoredshiftstreamingjson --record='Data="{\"productid\":1,\"productname\":\"phone\",\"productprice\":\" 2.30\"}"'

但是,当我使用 CSV 时,数据存在于 S3 中,但未加载到 redshift 中。

用于 CSV 的复制命令:

复制 products_csv FROM 's3://foldername/' CREDENTIALS 'aws_iam_role=arn:aws:iam:::role/' MANIFEST csv ;

放置 CSV 记录的 CLI 命令:

aws firehose put-record --delivery-stream-name csvtoredshiftstream --record='Data="1,Phone,2.30"'

记录正在处理到 S3,但未加载到 redshift。STL_LOAD_ERRORS 表也没有任何记录。

如果我遗漏了什么,请告诉我。

谢谢。

0 投票
2 回答
3526 浏览

amazon-web-services - Kinesis Firehose Putrecordbatch 示例

我正在寻找一个使用 putrecordbatch 将多条记录发送到 kinesis 流的示例。

我目前正在通过以下方式使用 putrecord 命令将记录发送到 kinesis 流。

aws firehose put-record --delivery-stream-name csvtoredshiftstreamingjson --record='Data="{\"productid\":1,\"productname\":\"phone\",\"productprice\":\" 2.30\"}"'

以类似的方式请求帮助以编写 putrecordbatch。

谢谢和问候, Srivignesh KN

0 投票
1 回答
2574 浏览

amazon-ec2 - Kinesis Agent not sending records to Stream

I have built a Kinesis Firehose stream to push data into redshift and am trying to push data from an EC2 instance using kinesis agent.

Firehose Stream is able to parse the records but not identify the firehose streams am getting the following java error.

Regards, Srivignesh KN

0 投票
0 回答
1691 浏览

amazon-kinesis-kpl - 运行 amazon-kinesis-producer-sample 时出错

我尝试运行amazon-kinesis-producer-sample

我在 Windows7 上使用了 amazon-kinesis-producer 0.10.2 版。出现以下错误:

这里可能是什么问题?