我正在尝试使用alpakka kinesis 连接器将消息发送到 Kinesis Stream,但我没有成功。我尝试了下面的代码,但我的流中没有任何内容。
implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val kinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient()
val debug = Flow[PutRecordsRequestEntry].map { reqEntry =>
println(reqEntry)
reqEntry
}
val entry = new PutRecordsRequestEntry()
.withData(ByteBuffer.wrap("Hello World".getBytes))
.withPartitionKey(Random.nextInt.toString)
Source.tick(1.second, 1.second, entry).to(KinesisSink("myStreamName", KinesisFlowSettings.defaultInstance)).run()
// 2) Source.tick(1.second, 1.second,entry).via(debug).to(KinesisSink("myStreamName", inesisFlowSettings.defaultInstance)).run()
- 使用 a
Sink.foreach(println)
而不是每 1 秒KinesisSink
打印PutRecordsRequestEntry
一次 => EXPECTED - 使用
KinesisSink
,条目仅生成一次。
我究竟做错了什么 ?
我正在用 a 检查我的流KinesisSource
并且正在阅读(用另一个流测试)
此外,AWS Kinesis 的监控仪表板不显示任何 PUT 请求。
注1:我尝试启用alpakka的调试日志但没有效果
<logger name="akka.stream.alpakka.kinesis" level="DEBUG"/>
在我的logback.xml
+ 根级调试中