在对您的问题进行了更多思考之后,我相信我找到了一个更通用的解决方案。让我们从EventStream
构造函数开始(它比你的Stream
构造函数更通用):
function EventStream() {
this.listeners = [];
}
然后我们创建一个dispatch
方法来将事件添加到流中:
EventStream.prototype.dispatch = function (event) {
return this.listeners.map(function (listener) {
return listener(event);
});
};
接下来,我们将创建一个map
比您的方法更通用的foreach
方法:
EventStream.prototype.map = function (f) {
var stream = new EventStream;
this.listeners.push(function (x) {
return stream.dispatch(f(x));
});
return stream;
};
现在,当您map
对事件流执行函数时,您将获得一个全新的事件流。例如,如果您的流是[0,1,3,5..]
并且您映射(+2)
它,那么新流将是[2,3,5,7..]
.
我们还将创建一些更有益的实用方法,例如,filter
如下所示:scan
merge
EventStream.prototype.filter = function (f) {
var stream = new EventStream;
this.listeners.push(function (x) {
if (f(x)) return stream.dispatch(x);
});
return stream;
};
该filter
方法过滤掉事件流中的某些事件以创建全新的事件流。例如给定[2,3,5,7..]
和过滤的事件流的函数odd
将是[3,5,7..]
.
EventStream.prototype.scan = function (a, f) {
var stream = new EventStream;
setTimeout(function () {
stream.dispatch(a);
});
this.listeners.push(function (x) {
return stream.dispatch(a = f(a, x));
});
return stream;
};
该scan
方法用于累积创建一个新的事件流。例如,给定流[3,5,7..]
、初始值和新事件流0
的扫描函数。(+)
[0,3,8,15..]
EventStream.prototype.merge = function (that) {
var stream = new EventStream;
this.listeners.push(function (x) {
return stream.dispatch(new Left(x));
});
this.listeners.push(function (y) {
return stream.dispatch(new Right(x));
});
return stream;
};
function Left(x) {
this.left = x;
}
function Right(x) {
this.right = x;
}
该merge
方法将两个单独的事件流合并为一个。为了区分哪个流生成了每个事件,我们将所有事件标记为左或右。
好吧,现在讨论更大的问题。让我们创建一个zip
方法。真正酷的是我们可以zip
使用map
、和方法创建filter
,如下所示:scan
merge
EventStream.prototype.zip = function (that) {
return this.merge(that).scan([[], [], null], function (acc, event) {
var left = acc[0], right = acc[1];
if (event instanceof Left) {
var value = event.left;
return right.length ?
[left, right.slice(1), new Just([value, right[0]])] :
[left.concat(value), right, null];
} else {
var value = event.right;
return left.length ?
[left.slice(1), right, new Just([left[0], value])] :
[tuple(left, right.concat(value), null];
}
})
.filter(function (a) {
return a[2] instanceof Just;
})
.map(function (a) {
return a[2].just;
});
};
function Just(x) {
this.just = x;
}
现在您可以按如下方式使用它:
stream1.zip(stream2).map(function (v) {
console.log(v);
});
您可以定义stream1
和stream2
如下:
var stream1 = getRandomStream();
var stream2 = getRandomStream();
function getRandomStream() {
var stream = new EventStream;
setInterval(function () {
stream.dispatch(Math.random());
}, ((Math.random() * 100) + 500) | 0);
return stream;
}
这里的所有都是它的。不需要承诺。