1

我希望将来自 kinesis 分析的 CSV 记录发送到 sagemaker 端点,并通过 lambda 函数进行推断,然后将其传递给 firehose API 以将其转储到 S3。但由于某种原因,数据没有进入 sagemaker。

'use strict';
console.log('Loading function');
var AWS = require('aws-sdk');
var sagemakerruntime = new AWS.SageMakerRuntime({apiVersion: '2017-05-13'});
var firehose = new AWS.Firehose({apiVersion: '2015-08-04'});
exports.handler = (event, context, callback) => {
    let success = 0;
    let failure = 0;
    const output = event.records.map((record) => {
        /* Data is base64 encoded, so decode here */
        const recordData = Buffer.from(record.data, 'base64');
        try {
            var params = {
                Body: new Buffer('...') || recordData /* Strings will be Base-64 encoded on your behalf */, /* required */
                EndpointName: 'String', /* required */
                Accept: 'text/csv',
                ContentType: 'text/csv'
            };
            sagemakerruntime.invokeEndpoint(params, function(err, data) {
                var result1;
                if (err) console.log(err, err.stack); // an error occurred
                else     console.log(data);           // successful response
                result1=data;
                var params = {
                    DeliveryStreamName: 'String', /* required */
                    Record: { /* required */
                        Data: new Buffer('...') || result1 /* Strings will be Base-64 encoded on your behalf */ /* required */
                    }
                };
                firehose.putRecord(params, function(err, data) {
                    if (err) console.log(err, err.stack); // an error occurred
                    else     console.log(data);           // successful response
                });
            });
            success++;
            return {
                recordId: record.recordId,
                result: 'Ok',
            };
        } catch (err) {
            failure++;
            return {
                recordId: record.recordId,
                result: 'DeliveryFailed',
            };
        }
    });
    console.log(`Successful delivered records ${success}, Failed delivered records ${failure}.`);
    callback(null, {
        records: output,
    });
};

4

0 回答 0