4

我正在尝试将纤维与流一起使用:

var Fiber = require('fibers');
var Future = require('fibers/future');
var fs = require('fs');

function sleepForMs(ms) {
  var fiber = Fiber.current;
  setTimeout(function() {
    fiber.run();
  }, ms);
  Fiber.yield();
}

function catchError(f, onError) {
  return function () {
    var args = arguments;
    var run = function () {
      try {
        var ret = f.apply(null, args);
      }
      catch (e) {
        onError(e);
      }
      return ret;
    };
    if (Fiber.current) {
      return run();
    }
    else {
      return Fiber(run).run();
    }
  }
}

function processFile(callback) {
  var count, finished, onData, onException, onIgnoredEntry;
  count = 0;
  finished = false;
  onException = function (error) {
    if (finished) {
      console.error("Exception thrown after already finished:", error.stack || error);
    }
    if (finished) {
      return;
    }
    finished = true;
    return callback(error);
  };
  onData = function(data) {
    console.log("onData");
    if (finished) {
      return;
    }
    console.log("before sleep");
    sleepForMs(500);
    console.log("after sleep");
    throw new Error("test");
  };
  return fs.createReadStream('test.js').on('data', catchError(onData, onException)).on('end', function() {
    console.log("end");
    if (finished) {
      return;
    }
    finished = true;
    return callback(null, count);
  }).on('error', function(error) {
    console.log("error", error);
    if (finished) {
      return;
    }
    finished = true;
    return callback(error);
  });
};

Fiber(function () {
  console.log("Calling processFile");
  Future.wrap(processFile)().wait();
  console.log("processFile returned");
}).run();
console.log("back in main");

但它并没有真正起作用。数据回调在回调内部的光纤完成之前完成。所以上面的代码输出:

Calling processFile
back in main
onData
before sleep
end
processFile returned
after sleep
Exception thrown after already finished: Error: test

事实上,它应该更像是:

Calling processFile
back in main
onData
before sleep
after sleep
end
processFile returned
Error: test
4

3 回答 3

0

减少睡眠时间并为其他块设置一些优先级或计时器。以便在一定的时间限制后根据优先级显示。这就是您以所需方式获得输出的方式。

于 2014-08-20T13:55:30.423 回答
0

这是使用 wait.for 的实现(围绕 Fibers 的包装器)https://github.com/luciotato/waitfor

在这个实现中,为每个数据块启动一个纤程,因此并行启动“n”个任务。ProcessFile 在所有纤程完成之前不会“返回”。

这是一个演示如何使用 Fibers 和 wait.for 执行此操作,但当然,在生产中使用它之前,您应该将模块级变量和所有函数封装在一个类中。

var wait = require('wait.for');
var fs = require('fs');

var tasksLaunched=0;
var finalCallback;
var callbackDone=false;
var dataArr=[]

function sleepForMs(ms,sleepCallback) {
  setTimeout(function() {
    return sleepCallback();
  }, ms);
}

function resultReady(err,data){

    if (err){
      callbackDone = true;
      return finalCallback(err);
    }

    dataArr.push(data);
    if (dataArr.length>=tasksLaunched && !callbackDone) {
      callbackDone = true;
      return finalCallback(null,dataArr);
    }
}

function processChunk(data,callback) {
    var ms=Math.floor(Math.random()*1000);
    console.log('waiting',ms);
    wait.for(sleepForMs,ms);
    console.log(data.length,"chars");
    return callback(null,data.length);
}

function processFile(filename,callback) {
  var count, onData, onException, onIgnoredEntry;
  count = 0;
  finalCallback = callback;

  onException = function (error) {
    if (!callbackDone){
      callbackDone = true;
      return callback(error);
    }
  };

  onData = function(data) {
    console.log("onData");
    tasksLaunched++;
    wait.launchFiber(processChunk,data,resultReady);
  };

  fs.createReadStream(filename)
    .on('data', onData)
    .on('end', function() {
        console.log("end");
    })
    .on('error', function(error) {
        console.log("error", error);
        if (!callbackDone) {
            callbackDone = true;
            return callback(error);
          }
    });
};

function mainFiber() {
  console.log("Calling processFile");
  var data = wait.for(processFile,'/bin/bash');
  console.log(data.length,"results");
  console.log("processFile returned");
};

//MAIN
wait.launchFiber(mainFiber);
console.log("back in main");
于 2014-08-16T21:27:56.490 回答
0

看起来没有人知道如何做你所要求的。

在这种情况下,您可以以某种传统的异步方式处理您的流,将您的屈服函数应用于结果。

以下是一些如何执行此操作的示例。


收集所有流数据raw-body

一种解决方案是在处理任何流数据之前收集所有流数据。raw-body使用模块可以轻松完成:

var rawBody = require('raw-body');

function processData(data) {
  console.log("before sleep");
  sleepForMs(500);
  console.log("after sleep");
}

function processFile(callback) {
  var stream = fs.createReadStream('fiber.js');
  rawBody(stream, function(err, data) {
    if (err) return callback(err);
    Fiber(processData).run(data); // process your data
    callback();
  });
}

使用此示例,您将:

  1. 等待所有块到达
  2. 开始处理您在Fiber
  3. processData主线程返回
  4. 流数据将在未来某个时间点进行处理

如果您愿意,您可以添加try ... catch或任何其他异常处理以防止processData破坏您的应用程序。


使用智能作业队列处理系列中的所有块

但是如果你真的想在它们到达的那一刻处理所有的数据块,你可以使用一些智能控制流模块。这是使用模块queue功能的示例:async

function processChunk(data, next) {
  return function() {
    console.log("before sleep");
    sleepForMs(500);
    console.log("after sleep");
    next();
  }
}

function processFile(callback) {
  var q = async.queue(function(data, next) {
    Fiber(processChunk(data, next)).run();
  }, 1);
  fs.createReadStream('fiber.js').on('data', function(data) {
    q.push(data);
  }).on('error', function(err) {
    callback(err);
  }).on('end', function() {
    callback(); // not waiting to queue to drain
  })
}

使用此示例,您将:

  1. 开始收听 , 将stream每个push新块添加到处理队列
  2. processData关闭时从主线程返回stream,不等待处理数据块
  3. 所有数据块将在某个时间点以严格的系列进行处理

我知道这不是你所要求的,但我希望它会帮助你。

于 2014-08-21T13:10:25.027 回答