正如 Nirrek 解释的那样,当您需要连接以推送数据源时,您必须为该源构建一个事件迭代器。
我想补充一点,上述机制可以重复使用。所以我们不必为每个不同的源重新创建一个事件迭代器。
解决方案是使用和方法创建一个通用通道。您可以从生成器内部调用该方法,并将该方法连接到数据源的侦听器接口。put
take
take
put
这是一个可能的实现。请注意,如果没有人在等待消息,则通道会缓冲消息(例如,生成器正忙于进行远程调用)
function createChannel () {
const messageQueue = []
const resolveQueue = []
function put (msg) {
// anyone waiting for a message ?
if (resolveQueue.length) {
// deliver the message to the oldest one waiting (First In First Out)
const nextResolve = resolveQueue.shift()
nextResolve(msg)
} else {
// no one is waiting ? queue the event
messageQueue.push(msg)
}
}
// returns a Promise resolved with the next message
function take () {
// do we have queued messages ?
if (messageQueue.length) {
// deliver the oldest queued message
return Promise.resolve(messageQueue.shift())
} else {
// no queued messages ? queue the taker until a message arrives
return new Promise((resolve) => resolveQueue.push(resolve))
}
}
return {
take,
put
}
}
那么上面的通道就可以在你想监听外部推送数据源的任何时候使用。对于你的例子
function createChangeChannel (replication) {
const channel = createChannel()
// every change event will call put on the channel
replication.on('change', channel.put)
return channel
}
function * startReplication (getState) {
// Wait for the configuration to be set. This can happen multiple
// times during the life cycle, for example when the user wants to
// switch database/workspace.
while (yield take(DATABASE_SET_CONFIGURATION)) {
let state = getState()
let wrapper = state.database.wrapper
// Wait for a connection to work.
yield apply(wrapper, wrapper.connect)
// Trigger replication, and keep the promise.
let replication = wrapper.replicate()
if (replication) {
yield call(monitorChangeEvents, createChangeChannel(replication))
}
}
}
function * monitorChangeEvents (channel) {
while (true) {
const info = yield call(channel.take) // Blocks until the promise resolves
yield put(databaseActions.replicationChange(info))
}
}