3

我有一个触发器,它为 Kinesis 上收到的每个事务执行 lambda 函数。生产者通过 PutRecordsRequest() 方法发送多个事务。Lambda函数如下;

var AWS = require('aws-sdk');
var firehose = new AWS.Firehose();
var fhStreamName = "transactions";

function writeToS3(jsonString,firehoseStreamName){

    console.log("Writing to S3 : " + jsonString)

    // Prepare storage to postings firehose stream...
    var params = { 
        DeliveryStreamName: firehoseStreamName, 
        Record: { 
            Data:  jsonString
        }
    };

    // Store data!
    firehose.putRecord(params, function(err, data) {
        if (err) { 

            // This needs to be fired to Kinesis in the future...
            console.log(err, err.stack); 
        }
        else{  
            console.log(data);            
        }
    });
}

function processEvent(event) {

    // Convert data object because this is all that we need
    var buf = new Buffer(event, "base64"); 

    // Convert to actual string which is readable
    var jsonString = buf.toString("utf8");

    return jsonString;
}   

exports.handler = function(event, context) {  

    var result = "";  

    // Loop events and register to firehose...  
    for(var i=0; i<event.Records.length; i++){
        result = result + processEvent(event.Records[i].kinesis.data,fhStreamName); 
    }   

    writeToS3(result,fhStreamName); 

    context.done();
};

但是,在编写事务时,在 S3 上它们不会被编写为 JSON 数组。下面是一个例子:

{
  "userName" : "val1",
  "betID" : "val2",
  "anotherID" : val3
}{
  "userName" : "val4",
  "anotherID" : "val5",
  "productID" : val6, 
}

这种格式的数据可以直接加载到 Athena 或 Redshift,还是必须在有效的数组中?我可以在这里看到http://docs.aws.amazon.com/redshift/latest/dg/copy-usage_notes-copy-from-json.html它仍然应该能够加载到 Redshift 中。

以下是在 Athena 中创建表时使用的属性...

ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://asgaard-data/data/'

如何加载这些数据以便能够查询它?

4

3 回答 3

5

对于 Athena,JSON 记录必须是每行一个对象:

{ "userName" : "val1", "betID" : "val2", "anotherID" : val3 }
{ "userName" : "val4", "anotherID" : "val5", "productID" : val6 }

这似乎违反直觉,因为生成的文件本身不是格式良好的 JSON 对象,但换行符分隔的文本对于 Athena、Hive 和类似的处理工具很有用。我相信同样的结构也适用于 Redshift,但 Redshift 有更多选择。

于 2017-03-25T17:41:51.377 回答
3

对于可能有同样问题的其他人,这是解决我的问题的代码,并且数据现在已为 Athena、Redshift 等正确格式化。它在 node.js 中。数据从我的生产者发送到 Kinesis,每个事务都有一个触发器,事务被逐一处理并发送到最后写入 S3 的 firehose。

var AWS = require('aws-sdk'); 
var firehose = new AWS.Firehose();
var fhStreamName = "transactions";

function processEvent(event,firehoseStreamName) {

    // Convert data object because this is all that we need
    var buf = new Buffer(event, "base64"); 

    // Convert to actual string which is readable
    var jsonString = buf.toString("utf8"); 

    // Prepare storage to postings firehose stream...
    var params = { 
        DeliveryStreamName: firehoseStreamName, 
        Record: { 
            Data:  jsonString.replace(/\n|\r/g, "")  + "\n"
        }
    };

    console.log("Writing : " + params.Record.Data)

    // Store data!
    firehose.putRecord(params, function(err, data) {
        if (err) { 

            // This needs to be fired to Kinesis in the future...
            console.log(err, err.stack); 
        }
        else{  
            //console.log(data);            
        }
    });
}   

exports.handler = function(event, context) {   

     // Loop events and register to firehose...  
     for(var i=0; i<event.Records.length; i++){
         processEvent(event.Records[i].kinesis.data,fhStreamName); 
     }  

    context.done();
};
于 2017-03-28T14:07:59.293 回答
0

您需要使用回调 - 最好批量记录并只使用一次回调。这是修改后的功能:

var AWS = require('aws-sdk'); 
var firehose = new AWS.Firehose({region: 'us-east-1'});
var fhStreamName = "transaction";

function add_newline(data) {
    var buf = new Buffer(data, "base64"); 
    var jsonString = buf.toString("utf8"); 
    return jsonString.replace(/\n|\r/g, "")  + "\n"
}

exports.handler = function(event, context, callback) {   

    if (event.Records) { 
         console.log(JSON.stringify(event.Records))

        // Loop events and register to firehose...  
        const records_arr =  
            event.Records.map(record => {
                return {Data: add_newline(record.kinesis.data)}
            });

        const params =
          {
            DeliveryStreamName: fhStreamName, /* required */
            Records: records_arr
          };

        firehose.putRecordBatch(params, function(err, data) {
          if (err) {
            console.log(err, err.stack); // an error occurred
            callback(err);
          }
          else {
            console.log("Wrote data to firehose ->");
            console.log(data);    
            callback(null);
          }
        });         
    }
};
于 2019-07-03T21:40:17.140 回答