12

我正在尝试编写一个 nodejs sqs 队列处理器。

"use strict";
var appConf = require('./config/appConf');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('./config/aws_config.json');
var sqs = new AWS.SQS();
var exec = require('child_process').exec;
function readMessage() {
  sqs.receiveMessage({
    "QueueUrl": appConf.sqs_distribution_url,
    "MaxNumberOfMessages": 1,
    "VisibilityTimeout": 30,
    "WaitTimeSeconds": 20
  }, function (err, data) {
    var sqs_message_body;
    if (data.Messages) {
      if (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined') {
        //sqs msg body
        sqs_message_body = JSON.parse(data.Messages[0].Body);
        //make call to nodejs handler in codeigniter
        exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"',
          function (error, stdout, stderr) {
            if (error) {
              throw error;
            }
            console.log('stdout: ' + stdout);
            if(stdout == 'Success'){
              //delete message from queue
              sqs.deleteMessage({
                "QueueUrl" : appConf.sqs_distribution_url,
                "ReceiptHandle" :data.Messages[0].ReceiptHandle
              });
            }
          });
      }
    }
  });
}
readMessage();

上面的代码适用于队列中的单个消息。我应该如何编写此脚本,以便在处理完所有消息之前一直轮询队列中的消息?我应该使用设置超时吗?

4

2 回答 2

16

首先,您应该明确地使用亚马逊提供的长轮询技术,据我了解,您已经在使用它,因为您"WaitTimeSeconds": 20在调用中有参数sqs.receiveMessage。希望大家不要忘记在AWS Web 界面中进行配置。

关于轮询消息-您可以使用包括计时器在内的不同技术,但我认为最简单的方法就是在's(甚至's)回调函数readMessage()的末尾调用您的函数。因此,队列中下一条消息的处理(或等待)将在队列中上一条消息的处理结束后立即开始。receiveMessageexec

更新:

至于我在你的新版本代码中有很多readMessage()电话。我认为最好将其最小化以保持代码更清晰和易于维护。但是,例如,如果您在主receiveMessage回调结束时留下唯一的一个调用,您将收到许多并行运行的 PHP 工作脚本 - 从性能的角度来看,这可能还不错 - 但您会必须添加一些复杂的脚本来控制并行工作者的数量。我认为您可以在exec回调中减少一些呼叫,尝试加入ifs 并在主回调中加入呼叫。

"use strict";
var appConf = require('./config/appConf');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('./config/aws_config.json');
var delay = 20 * 1000;
var sqs = new AWS.SQS();
var exec = require('child_process').exec;
function readMessage() {
  sqs.receiveMessage({
    "QueueUrl": appConf.sqs_distribution_url,
    "MaxNumberOfMessages": 1,
    "VisibilityTimeout": 30,
    "WaitTimeSeconds": 20
  }, function (err, data) {
    var sqs_message_body;
    if (data.Messages) 
      && (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined')) {
        //sqs msg body
        sqs_message_body = JSON.parse(data.Messages[0].Body);
        //make call to nodejs handler in codeigniter
        exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"',
          function (error, stdout, stderr) {
            if (error) {
              // error handling 
            }
            if(stdout == 'Success'){
              //delete message from queue
              sqs.deleteMessage({
                "QueueUrl" : appConf.sqs_distribution_url,
                "ReceiptHandle" :data.Messages[0].ReceiptHandle
              }, function(err, data){                
              });
            }
            readMessage();                
          });
      }          
    }        
    readMessage();        
  });
}
readMessage();

关于内存泄漏:我认为你不应该担心,因为下一次调用readMessage()发生在回调函数中 - 所以不是递归的,递归调用的函数在调用函数之后将值返回给父receiveMessage()函数。

于 2013-07-23T09:07:40.090 回答
2

如果您使用的是节点,请使用https://www.npmjs.com/package/sqs-worker模块。它会为你完成这项工作。

var SQSWorker = require('sqs-worker')

var options =
 { url: 'https://sqs.eu-west-1.amazonaws.com/001123456789/my-queue'
}

var queue = new SQSWorker(options, worker)

function worker(notifi, done) {
  var message;
  try {
    message = JSON.parse(notifi.Data)
  } catch (err) {
    throw err
  }

   // Do something with `message` 

   var success = true

   // Call `done` when you are done processing a message. 
   // If everything went successfully and you don't want to see it any more, 
   // set the second parameter to `true`. 
   done(null, success)
}
于 2016-04-20T15:10:04.627 回答