0

我有一个 node.js 流,我暂时将其写入这样的数组:

var tempCrossSection = [];

stream.on('data', function(data) {
    tempCrossSection.push(data);
});

然后我定期获取该数组中的数据(并清除它)并对其进行一些处理,如下所示:

var crossSection = [];

setInterval(function() {
    crossSection = tempCrossSection;
    tempCrossSection = [];

    someOtherFunction(crossSection, function(data) {
        console.log(data);
    }
}, 30000);

问题是我得到了一些奇怪的行为,流被写入数组的顺序以及随着流速率增加和/或 someOtherFunction 回调花费太长时间而触发的 setInterval 回调的数量。

我应该如何实现这一点,以便流正确地将数据写入数组(按顺序),并且每个 setInterval 回调都进行一次数据处理。

4

1 回答 1

1

您的代码存在一些问题。首先,你分享给很多国家。例如 crossSection 应该在匿名的 Interval 函数中单独定义。为什么将“crossSection”定义为闭包?如果 someOtherFunction 运行很长时间,您可能确实会遇到某种竞争条件。

var source = [];

stream.on('data', function(data) {
    source.push(data);
});

setInterval(function() {
    var target = source;
    source = [];

    someOtherFunction(target, function(data) {
        console.log(data);
    }
}, 30000);

如果您可以访问someOtherFunction那么我会像这样重写整个内容

var source = [];

stream.on('data', function(data) {
    source.push(data);
});

setInterval(function() {
    var processing = true;

    while (processing) {
        var elem = source.shift();
        someOtherFunction(elem, function(data) {
            console.log(data);
        });
        processing = checkForBreakConditionAndReturnFalseIfBreak();
    }
}, 30000);

如果元素的数量很大并且someOtherFunctions需要很长时间,您仍然可能会遇到一些问题。所以我可能会做这样的事情

var source = [];
var timerId = 0;

stream.on('data', function(data) {
    source.push(data);
});

function processSource() {
    clearTimeout(timerId);
    var processing = true;

    while (processing) {
        var elem = source.shift();
        someOtherFunction(elem, function(data) {
            console.log(data);
        });
        processing = checkForBreakConditionAndReturnFalseIfBreak();
    }
    setTimeout(processSource, calcTimeoutForNextProcessingDependentOnPastData());
};

setTimeout(processSource, 30000); //initial Timeout
于 2013-04-03T08:33:27.953 回答