5

我有一个 rxjs 观察者(实际上是一个主题),它永远跟踪一个文件,就像 tail -f 一样。例如,监视日志文件非常棒。

这种“永远”的行为对我的应用程序来说很棒,但对测试来说却很糟糕。目前我的应用程序有效,但我的测试永远挂起。

我想强制观察者更改提前完成,因为我的测试代码知道文件中应该有多少行。我该怎么做呢?

我尝试在返回的主题句柄上调用 onCompleted,但此时它基本上被强制转换为观察者,你不能强制它关闭,错误是:

对象#没有方法'onCompleted'

这是源代码:

function ObserveTail(filename) {

source = new Rx.Subject();

if (fs.existsSync(filename) == false) {
    console.error("file doesn't exist: " + filename);
}

var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);

tail.on("line", function(line) {
        source.onNext(line);
});
tail.on('close', function(data) {
    console.log("tail closed");
    source.onCompleted();
});     
tail.on('error', function(error) {
    console.error(error);
});     

this.source = source;
}           

这是无法弄清楚如何强制永远结束的测试代码(磁带式测试)。注意“非法”行:

test('tailing a file works correctly', function(tid) {

var lines = 8;
var i = 0;
var filename = 'tape/tail.json';
var handle = new ObserveTail(filename);
touch(filename);

handle.source
.filter(function (x) {
    try {
        JSON.parse(x);
        return true;
    } catch (error) {
        tid.pass("correctly caught illegal JSON");
        return false;
    }
})
.map(function(x) { return JSON.parse(x) })
.map(function(j) { return j.name })
.timeout(10000, "observer timed out")
.subscribe (
    function(name) {
        tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
        i++;
        if (i >= lines) {
            handle.onCompleted();   // XXX ILLEGAL
        }
    },
    function(err) {  
        console.error(err)
        tid.fail("err leaked through to subscriber");
    },
    function() {
        tid.end();
        console.log("Completed");
    }
);

})
4

1 回答 1

5

听起来你解决了你的问题,但你原来的问题

我想强制观察者更改提前完成,因为我的测试代码知道文件中应该有多少行。我该怎么做呢?

一般来说,Subject当您有更好的选择时,不鼓励使用 s,因为它们往往是人们使用他们熟悉的编程风格的拐杖。Subject我建议您考虑一下每个事件在 Observable 生命周期中的含义,而不是尝试使用。

包装事件发射器

已经存在.EventEmitter#on/off形式的模式包装器Observable.fromEvent。它仅在有侦听器时处理清理和保持订阅活动。因此ObserveTail可以重构为

function ObserveTail(filename) {

  return Rx.Observable.create(function(observer) {
    var lineSep = /[\r]{0,1}\n/;
    tail = new Tail(filename, lineSep, {}, true);
    var line = Rx.Observable.fromEvent(tail, "line");
    var close = Rx.Observable.fromEvent(tail, "close");
    var error = Rx.Observable.fromEvent(tail, "error")
                  .flatMap(function(err) { return Rx.Observable.throw(err); });
    //Only take events until close occurs and wrap in the error for good measure
    //The latter two are terminal events in this case.
    return line.takeUntil(close).merge(error).subscribe(observer);
  });
} 

与普通使用相比,它有几个好处Subjects,一,您现在实际上会在下游看到错误,二,这将在您完成事件后处理事件的清理。

避免 *Sync 方法

然后可以将其滚动到您的文件存在检查中,而无需使用readSync

//If it doesn't exist then we are done here
//You could also throw from the filter if you want an error tracked
var source = Rx.Observable.fromNodeCallback(fs.exists)(filename)
    .filter(function(exists) { return exists; })
    .flatMap(ObserveTail(filename));

接下来,您可以通过使用来简化您的过滤器/映射/映射序列flatMap

var result = source.flatMap(function(x) {
  try {
    return Rx.Observable.just(JSON.parse(x));
  } catch (e) {
    return Rx.Observable.empty();
  }
}, 
//This allows you to map the result of the parsed value
function(x, json) {
  return json.name;
})
.timeout(10000, "observer timed out");

不要发信号,退订

当流仅沿一个方向传播时,您如何停止“信号”停止。我们实际上很少希望 Observer 直接与 Observable 通信,因此更好的模式是不实际“发出信号”停止,而是简单地取消订阅Observable并让 Observable 的行为决定它应该从那里做什么。

从本质上讲,您Observer真的不应该关心您Observable,而不仅仅是说“我在这里完成了”。

为此,您需要声明在停止时要达到的条件。

在这种情况下,由于您只是在测试用例中的一个设定数字后停止,您可以使用它take来取消订阅。因此最终的订阅块看起来像:

result
 //After lines is reached this will complete.
 .take(lines)
 .subscribe (
    function(name) {
        tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
    },
    function(err) {  
        console.error(err)
        tid.fail("err leaked through to subscriber");
    },
    function() {
        tid.end();
        console.log("Completed");
    }
);

编辑 1

正如评论中所指出的,对于这个特定的 api,没有真正的“关闭”事件,因为 Tail 本质上是一个无限操作。从这个意义上说,它与鼠标事件处理程序没有什么不同,当人们停止收听时,我们将停止发送事件。所以你的块可能最终看起来像:

function ObserveTail(filename) {

  return Rx.Observable.create(function(observer) {
    var lineSep = /[\r]{0,1}\n/;
    tail = new Tail(filename, lineSep, {}, true);
    var line = Rx.Observable.fromEvent(tail, "line");
    var error = Rx.Observable.fromEvent(tail, "error")
                  .flatMap(function(err) { return Rx.Observable.throw(err); });
    //Only take events until close occurs and wrap in the error for good measure
    //The latter two are terminal events in this case.
    return line
            .finally(function() {  tail.unwatch(); })
            .merge(error).subscribe(observer);
  }).share();
} 

finally和运算符的添加share创建了一个对象,该对象将在新订阅者到达时附加到尾部,并且只要至少有一个订阅者仍在侦听,它将保持连接状态。一旦所有的订阅者都完成了,我们就可以安全地unwatch拖尾了。

于 2016-01-30T19:44:35.653 回答