我正在尝试将现有的 API 转换为与 RxJS 一起使用......对节点来说相当新,对 RxJs 来说非常新,所以请多多包涵。
我有一个现有的 API(getNextMessage),当某些东西可用时,它要么阻塞(异步),要么通过节点样式(err,val)回调返回新项目或错误。
所以它看起来像:
getNextMessage(nodeStyleCompletionCallback);
您可以将 getNextMessage 想象成一个 http 请求,该请求在未来服务器响应时完成,但您确实需要在收到消息后再次调用 getNextMessage 以继续从服务器获取新项目。
所以,为了使它成为一个可观察的集合,我必须让 RxJs 继续调用我的 getNextMessage 函数,直到订阅者被释放();
基本上,我正在尝试创建自己的 RxJs 可观察集合。
问题是:
- 我不知道如何让subscriber.dispose() 杀死async.forever
- 我可能不应该首先使用 async.forever
- 我不确定我是否应该为每条消息“完成” - 那不应该在序列的末尾
- 我想最终消除使用 fromNodeCallback 的需要,以拥有一流的 RxJS 可观察
- 显然我有点困惑。
希望能得到一点帮助,谢谢!
这是我现有的代码:
var Rx = require('rx');
var port = require('../lib/port');
var async = require('async');
function observableReceive(portName)
{
var observerCallback;
var listenPort = new port(portName);
var disposed = false;
var asyncReceive = function(asyncCallback)
{
listenPort.getNextMessage(
function(error, json)
{
observerCallback(error, json);
if (!disposed)
setImmediate(asyncCallback);
}
);
}
return function(outerCallback)
{
observerCallback = outerCallback;
async.forever(asyncReceive);
}
}
var receive = Rx.Observable.fromNodeCallback(observableReceive('rxtest'));
var source = receive();
var subscription = source.forEach(
function (json)
{
console.log('receive completed: ' + JSON.stringify(json));
},
function (error) {
console.log("receive failed: " + error.toString());
},
function () {
console.log('Completed');
subscription.dispose();
}
);