1

我正在为一个用例学习 AWS 服务。在浏览完文档后,我想出了一个简单的流程。我想使用 Streams API 和 KPL 将数据摄取到 Kinesis 流中。我使用示例 putRecord 方法将数据摄取到流中。我正在将此 JSON 摄取到流中 -

{"userid":1234,"username":"jDoe","firstname":"John","lastname":"Doe"}

摄取数据后,我会在 putRecordResult 中得到以下响应-

Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318638512162631666140828401666}
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318645765717549353915876638722}
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318649392495008197803400757250}

现在我编写了一个 Lambda 函数来获取这些数据并推送到 DynamoDB 表中。这是我的 Lambda 函数 -

console.log('Loading function');
var AWS = require('aws-sdk');
var tableName = "sampleTable";
var doc = require('dynamodb-doc');
var db = new doc.DynamoDB();

exports.handler = (event, context, callback) => {
    //console.log('Received event:', JSON.stringify(event, null, 2));
    event.Records.forEach((record) => {
        // Kinesis data is base64 encoded so decode here
        const payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
        console.log('Decoded payload:', payload);
        var userid = event.userid;
        var username = event.username;
        var firstname = event.firstname;
        console.log(userid + "," + username +","+ firstname);

        var item = {
            "userid" : userid,
            "username" : username,
            "firstname" : firstname
        };

        var params = {
            TableName : tableName,
            Item : item
        };
        console.log(params);

        db.putItem(params, function(err, data){
            if(err) console.log(err);
            else console.log(data);
        });

    });
    callback(null, `Successfully processed ${event.Records.length} records.`);
};

不知何故,我无法在 lambda 函数执行中看到 console.logs。我在流页面中看到有 putRecord 到流中并且得到了但是不知何故我在 Lambdafunction 页面和 DynamoDB 表中什么都看不到。

我有一个用于将数据摄取到 Kinesis 中的 Java 代码的 IAM 策略,另一个用于 Lambda 函数的 lambda-kinesis-execution-role 以及 DynamoDB 将数据摄取到表中的策略。

是否有任何教程显示它是如何以正确的方式完成的?我感觉我在这个过程中遗漏了很多点,例如如何链接所有这些 IAM 策略并使它们同步,以便当数据被放入流时它由 Lambda 处理并最终在 Dynamo 中?

非常感谢任何指示和帮助。

4

2 回答 2

1

如果您上面的代码是您正在使用的代码的直接副本,那么您正在引用event.userid但您应该使用payload.userid. 您已将 Kinesis 记录解码为有效负载变量。

于 2016-06-29T04:14:37.707 回答
0

您可以使用 Lambda 函数

1.为 Kinesis 和 Dynamodb 创建 IAM 角色

2.现在从 dynamodb-process-stream 的蓝图创建一个 Lambda 函数

3.选择我们从IAM创建的执行角色

4.单击 Create Function Now 转到 Edit code 部分并编写以下代码

const AWS =require('aws-sdk');
const docClient =new AWS.DynamoDB.DocumentClient({region : 'us-east-1'});

exports.handler = (event, context, callback) => {
   event.Records.forEach((record) => { 

    var params={
    Item :{
            ROWTIME:Date.now().toString(),//Dynamodb column name
   DATA:new Buffer(record.kinesis.data, base64').toString('ascii')//Dynamodb column name
              },
    TableName:'mytable'//Dynamodb Table Name
    };

docClient.put(params,function(err,data){
    if(err){
        callback(err,null);
    }
    else{
        callback(null,data);
    }
});
});
};
于 2018-03-09T12:52:55.180 回答