0

我有一个用例,我必须验证发送到 Kinesis firehose 的有效负载确实正在发送。

为了做到这一点,我想出了链 Firehose -> Firehose 数据转换(使用 lambda) -> DDB -> 检查 DDB 中的有效负载(有效负载是 DDB 中的哈希键)。我必须以编程方式一次性定义整个链。数据转换与http://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html相同。

我正在做这一切,因为我无法完全控制它所在的 S3 存储桶中的文件名。所以我需要将确切的有效负载发送到一些持久键值存储中。

问题是

http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesisfirehose/model/CreateDeliveryStreamRequest.html似乎不支持添加数据转换 lambda。

我的问题是,这是否可行而无需触摸控制台一次(完全通过 AWS Kinesis Firehose API)。

或者确实有任何替代建议以某种方式将数据移动到 DDB。

4

2 回答 2

3

好吧,经过大量的努力和文档搜索,我想通了。

您必须使用 lambda ARN 定义处理配置来定义数据转换。

http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesisfirehose/model/ProcessingConfiguration.html

这是这样的配置在代码中的样子。

final ProcessingConfiguration processingConfiguration =
      new ProcessingConfiguration().withEnabled(true)
         .withProcessors(newProcessor().withType(ProcessorType.Lambda)
         .withParameters(new ProcessorParameter().withParameterName(LambdaArn)
         .withParameterValue(lamdbaFunctionArn)));

final CreateDeliveryStreamResult describeDeliveryStreamResult =
      client.createDeliveryStream(new CreateDeliveryStreamRequest().withExtendedS3DestinationConfiguration(
         new ExtendedS3DestinationConfiguration()
             .withBucketARN(s3BucketARN)
             .withRoleARN(roleArn)
             .withPrefix(keyPrefix)
             .withProcessingConfiguration(processingConfiguration))
             .withDeliveryStreamName(streamName));

这里的 ARN 是各种对象(S3 目标、数据转换 lambda、IAM 角色等)的资源名称。

于 2017-04-30T01:04:38.200 回答
0

您可以使用 lambda 函数来指定任务。 https://github.com/hixichen/golang_lamda_decode_protobuf_firehose

于 2018-07-25T22:08:55.983 回答