我希望将来自 kinesis 分析的 CSV 记录发送到 sagemaker 端点,并通过 lambda 函数进行推断,然后将其传递给 firehose API 以将其转储到 S3。但由于某种原因,数据没有进入 sagemaker。
'use strict';
console.log('Loading function');
var AWS = require('aws-sdk');
var sagemakerruntime = new AWS.SageMakerRuntime({apiVersion: '2017-05-13'});
var firehose = new AWS.Firehose({apiVersion: '2015-08-04'});
exports.handler = (event, context, callback) => {
let success = 0;
let failure = 0;
const output = event.records.map((record) => {
/* Data is base64 encoded, so decode here */
const recordData = Buffer.from(record.data, 'base64');
try {
var params = {
Body: new Buffer('...') || recordData /* Strings will be Base-64 encoded on your behalf */, /* required */
EndpointName: 'String', /* required */
Accept: 'text/csv',
ContentType: 'text/csv'
};
sagemakerruntime.invokeEndpoint(params, function(err, data) {
var result1;
if (err) console.log(err, err.stack); // an error occurred
else console.log(data); // successful response
result1=data;
var params = {
DeliveryStreamName: 'String', /* required */
Record: { /* required */
Data: new Buffer('...') || result1 /* Strings will be Base-64 encoded on your behalf */ /* required */
}
};
firehose.putRecord(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else console.log(data); // successful response
});
});
success++;
return {
recordId: record.recordId,
result: 'Ok',
};
} catch (err) {
failure++;
return {
recordId: record.recordId,
result: 'DeliveryFailed',
};
}
});
console.log(`Successful delivered records ${success}, Failed delivered records ${failure}.`);
callback(null, {
records: output,
});
};