3

我有一个写入 DynamoDB 表的应用程序,我试图让 Kinesis 进行聚合,然后将聚合数据写入另一个 DynamoDB 表。

在我的 DynamoDB 表上启用了流,并且我在流上有一个 lamdba 触发器,如下所示:

'use strict';

var AWS = require('aws-sdk');
var kinesis = new AWS.Kinesis();

exports.handler = (event, context, callback) => {
    event.Records.forEach((record) => {

        var myValue = record.dynamodb.NewImage.myValue.N;
        var partitionKey = record.key.S;
        var data = '{"VALUE":"' + myValue + '"}';

        var recordParams = {
            Data: data,
            PartitionKey: partitionKey,
            StreamName: 'MyStreamName'
        };

        console.log('Try Put to Kinesis Stream');

        kinesis.putRecord(recordParams, function(err, data) {
            if (err) {
                console.log('Failed Put');
            } else {
                console.log('Successful Put');
            }
        });
    });
};

当我在 Lambda 测试事件中有三个或四个元素时,这会成功写入我的 Kinesis Stream。

当我启用触发器时,它根本不会写入我的 Kinesis Stream。一次似乎有大约 100 个元素进入。在 Cloudwatch 中,我看到“尝试放入 Kinesis Stream”消息,但我什至看不到成功/失败消息。

我是在做完全错误的事情还是有更好的方法来解决这个问题?

如果 DynamoDB 的流可以直接输入 Kinesis Analytics,那将是我的一等奖 :)

4

2 回答 2

4

您的错误是您的 lambda 函数不会等到所有 kinesis.putRecord 调用完成。

在 Node.js 中,您有一个回调编程模型。您发出异步请求,并在请求完成时调用回调。所以函数返回时请求还没有完成。调用回调时完成。

解决问题的两种方法:

自己跟踪调用的回调

'use strict';
var AWS = require('aws-sdk');
var kinesis = new AWS.Kinesis();
exports.handler = (event, context, callback) => {
    event.Records.forEach((record) => {
        var myValue = record.dynamodb.NewImage.myValue.N;
        var partitionKey = record.key.S;
        var data = '{"VALUE":"' + myValue + '"}';
        var recordParams = {
            Data: data,
            PartitionKey: partitionKey,
            StreamName: 'MyStreamName'
        };
        console.log('Try Put to Kinesis Stream');
        var i = 0;
        kinesis.putRecord(recordParams, function(err, data) {
            if (err) {
                console.log('Failed Put');
                i = event.Records.length;
            } else {
                console.log('Successful Put');
                i += 1;
            }
            if (i === event.Records.length) {
                console.log('All done');
                callback(err);
            }
        });
    });
};

或使用 async 之类的库:https ://www.npmjs.com/package/async

于 2016-10-27T20:17:27.437 回答
0

在我看来,您的整体问题的一部分(除了需要调用callback, per hellomichibye)以及您在评论中描述的行为可能来自您如何为Data. 与其手动创建 JSON 字符串Data,不如尝试使用JSON.stringify以便您知道输入将始终正确格式化。

于 2017-01-27T20:59:09.313 回答