2

我最近实现了一个可写流,它将来自多个可读流的聚合数据保存到一个 mongo 数据库中。由于流的高吞吐量,我希望实现一些去抖动功能。

我正在努力寻找一种好的方法来做到这一点,麻烦来自需要在管道中施加背压。本质上,如果 debounce timeout 执行,Writeable 'next()' 回调只需要在 timeout 函数完成后调用,但如果 timeout 尚未执行,Writeable 需要立即 'next()' 来维持流程.

我觉得我在这里错过了一些简单的东西,希望这里有人能让我踢自己。

_write() 逻辑解释/TLDR:

// Queues a mongo bulk upsert operation.
queueMongoUpsertOperation(someData);

// This should only be called NOW if the timeout isn't executing
// as we should wait for it to finish the save before reading
// more data IF it is saving.
// Best/SAFEST way to block this call when the timeout is executing
next()

// Simple debounce.
clearTimeout(timeout);

timeout = setTimeout(function(){

    // Execute the actual asyncronous save from the queued upsert operations.
    executeMongoBulkUpsert(function(){

        // Because the next() above is somehow blocked when the timeout
        // is executed, we should now called next() from within
        // the debounced function.
        next();

    });

}, 1000);
4

0 回答 0