7

我正在尝试使用alpakka kinesis 连接器将消息发送到 Kinesis Stream,但我没有成功。我尝试了下面的代码,但我的流中没有任何内容。

implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val kinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient()


val debug = Flow[PutRecordsRequestEntry].map { reqEntry =>
    println(reqEntry)
    reqEntry
}

val entry = new PutRecordsRequestEntry()
    .withData(ByteBuffer.wrap("Hello World".getBytes))
    .withPartitionKey(Random.nextInt.toString)

Source.tick(1.second, 1.second, entry).to(KinesisSink("myStreamName", KinesisFlowSettings.defaultInstance)).run()

// 2) Source.tick(1.second, 1.second,entry).via(debug).to(KinesisSink("myStreamName", inesisFlowSettings.defaultInstance)).run()
  • 使用 aSink.foreach(println)而不是每 1 秒KinesisSink打印PutRecordsRequestEntry一次 => EXPECTED
  • 使用KinesisSink,条目仅生成一次。

我究竟做错了什么 ?

我正在用 a 检查我的流KinesisSource并且正在阅读(用另一个流测试)

此外,AWS Kinesis 的监控仪表板不显示任何 PUT 请求。

注1:我尝试启用alpakka的调试日志但没有效果

<logger name="akka.stream.alpakka.kinesis" level="DEBUG"/>

在我的logback.xml+ 根级调试中

4

2 回答 2

4

下面要考虑的一些故障排除步骤 - 我希望它们有所帮助。

我怀疑您可能缺少 Kinesis 客户端的凭据和/或区域配置。

Kinesis Firehose

Kinesis Producer Library(Alpakka 似乎正在使用)不适用于 Kinesis Firehose。如果您尝试写信给 Firehose,这是行不通的。

应用程序日志

您可能希望为 Kinesis Producer Library 启用日志记录,而不仅仅是在 Alpakka 本身中。相关文档可在此处获得:

配置 Kinesis 创建者库

Kinesis Producer Library 的配置默认值

AWS 端日志记录

AWS CloudTrail 自动为 Kinesis 流启用开箱即用,默认情况下,AWS 将为您保留 90 天的 CloudTrail 日志。

https://docs.aws.amazon.com/streams/latest/dev/logging-using-cloudtrail.html

您可以使用 CloudTrail 日志查看您的应用程序代表您对 Kinesis 进行的 API 调用。请求的显示通常会有适度的延迟 - 但这会让您知道请求是否由于 IAM 权限不足或您的 AWS 资源配置的其他问题而失败。

检查 SDK 身份验证

Kinesis 客户端将使用DefaultAWSCredentialsProviderChain凭证提供程序向 AWS 发出请求。

您需要确保提供具有 IAM 权限的有效 AWS 凭证以向 Kinesis 发出这些请求。如果您的代码在 AWS 上运行,则提供应用程序凭证的首选方式是使用IAM 角色(在实例启动时指定)。

在代码中构建客户端时,您还需要指定 AWS 区域。使用您application.properties的来配置它,或者如果您的应用程序是位于单个区域中的 CloudFormation 堆栈的一部分 - 当您的代码在 AWS 上运行时,使用实例元数据服务来检索当前区域。

于 2018-01-10T05:21:25.280 回答
2

问题是对流上的操作的访问被拒绝/许可。

我必须添加 akka 演员配置以进行日志记录

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "DEBUG"
  stdout-loglevel = "DEBUG"
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  logger-startup-timeout = "30s"
}

查看调试行,我实际上在每个阶段都在调试和步骤中运行。

它需要IAM 角色中的“PutRecord s ”权限

于 2018-01-10T12:25:14.940 回答