17

我正在尝试使用redux-sagaPouchDB中的事件连接到我的React.js应用程序,但我正在努力弄清楚如何将 PouchDB 发出的事件连接到我的 Saga。由于该事件使用回调函数(并且我无法将其传递给生成器),因此我无法yield put()在回调内部使用,它在 ES2015 编译后(使用 Webpack)会给出奇怪的错误。

所以这就是我想要完成的,不起作用的部分是 inside replication.on('change' (info) => {})

function * startReplication (wrapper) {
  while (yield take(DATABASE_SET_CONFIGURATION)) {
    yield call(wrapper.connect.bind(wrapper))

    // Returns a promise, or false.
    let replication = wrapper.replicate()

    if (replication) {
      replication.on('change', (info) => {
        yield put(replicationChange(info))
      })
    }
  }
}

export default [ startReplication ]
4

5 回答 5

27

正如 Nirrek 解释的那样,当您需要连接以推送数据源时,您必须为该源构建一个事件迭代器。

我想补充一点,上述机制可以重复使用。所以我们不必为每个不同的源重新创建一个事件迭代器。

解决方案是使用和方法创建一个通用通道。您可以从生成器内部调用该方法,并将该方法连接到数据源的侦听器接口。puttaketakeput

这是一个可能的实现。请注意,如果没有人在等待消息,则通道会缓冲消息(例如,生成器正忙于进行远程调用)

function createChannel () {
  const messageQueue = []
  const resolveQueue = []

  function put (msg) {
    // anyone waiting for a message ?
    if (resolveQueue.length) {
      // deliver the message to the oldest one waiting (First In First Out)
      const nextResolve = resolveQueue.shift()
      nextResolve(msg)
    } else {
      // no one is waiting ? queue the event
      messageQueue.push(msg)
    }
  }

  // returns a Promise resolved with the next message
  function take () {
    // do we have queued messages ?
    if (messageQueue.length) {
      // deliver the oldest queued message
      return Promise.resolve(messageQueue.shift())
    } else {
      // no queued messages ? queue the taker until a message arrives
      return new Promise((resolve) => resolveQueue.push(resolve))
    }
  }

  return {
    take,
    put
  }
}

那么上面的通道就可以在你想监听外部推送数据源的任何时候使用。对于你的例子

function createChangeChannel (replication) {
  const channel = createChannel()

  // every change event will call put on the channel
  replication.on('change', channel.put)
  return channel
}

function * startReplication (getState) {
  // Wait for the configuration to be set. This can happen multiple
  // times during the life cycle, for example when the user wants to
  // switch database/workspace.
  while (yield take(DATABASE_SET_CONFIGURATION)) {
    let state = getState()
    let wrapper = state.database.wrapper

    // Wait for a connection to work.
    yield apply(wrapper, wrapper.connect)

    // Trigger replication, and keep the promise.
    let replication = wrapper.replicate()

    if (replication) {
      yield call(monitorChangeEvents, createChangeChannel(replication))
    }
  }
}

function * monitorChangeEvents (channel) {
  while (true) {
    const info = yield call(channel.take) // Blocks until the promise resolves
    yield put(databaseActions.replicationChange(info))
  }
}
于 2016-02-09T09:56:00.313 回答
13

我们可以使用eventChannelredux-saga

这是我的例子

// fetch history messages
function* watchMessageEventChannel(client) {
  const chan = eventChannel(emitter => {
    client.on('message', (message) => emitter(message));
    return () => {
      client.close().then(() => console.log('logout'));
    };
  });
  while (true) {
    const message = yield take(chan);
    yield put(receiveMessage(message));
  }
}

function* fetchMessageHistory(action) {
  const client = yield realtime.createIMClient('demo_uuid');
  // listen message event
  yield fork(watchMessageEventChannel, client);
}

请注意

eventChannel 上的消息默认不缓冲。如果只想message event一个一个处理,就不能使用blocking call afterconst message = yield take(chan);

或者您必须为 eventChannel 工厂提供一个缓冲区,以便为通道指定缓冲策略(例如 eventChannel(subscriber, buffer))。有关更多信息,请参阅redux-saga API 文档

于 2016-08-04T08:34:33.997 回答
9

我们必须解决的基本问题是事件发射器是“基于推的”,而 sagas 是“基于拉的”。

如果您订阅这样的事件: ,那么只要事件发射器决定推送一个新值replication.on('change', (info) => {}),就会执行回调。replication

对于 sagas,我们需要翻转控件。当它决定响应可用的新更改信息时,必须控制传奇。换句话说,saga 需要提取新信息。

以下是实现此目的的一种方法的示例:

function* startReplication(wrapper) {
  while (yield take(DATABASE_SET_CONFIGURATION)) {
    yield apply(wrapper, wrapper.connect);
    let replication = wrapper.replicate()
    if (replication)
      yield call(monitorChangeEvents, replication);
  }
}

function* monitorChangeEvents(replication) {
  const stream = createReadableStreamOfChanges(replication);

  while (true) {
    const info = yield stream.read(); // Blocks until the promise resolves
    yield put(replicationChange(info));
  }
}

// Returns a stream object that has read() method we can use to read new info.
// The read() method returns a Promise that will be resolved when info from a
// change event becomes available. This is what allows us to shift from working
// with a 'push-based' model to a 'pull-based' model.
function createReadableStreamOfChanges(replication) {
  let deferred;

  replication.on('change', info => {
    if (!deferred) return;
    deferred.resolve(info);
    deferred = null;
  });

  return {
    read() {
      if (deferred)
        return deferred.promise;

      deferred = {};
      deferred.promise = new Promise(resolve => deferred.resolve = resolve);
      return deferred.promise;
    }
  };
}

这里有上面例子的 JSbin:http://jsbin.com/cujudes/edit?js, console

您还应该看看 Yassine Elouafi 对类似问题的回答: 我可以使用 redux-saga 的 es6 生成器作为 websockets 或事件源的 onmessage 侦听器吗?

于 2016-02-09T09:03:28.143 回答
3

感谢@Yassine Elouafi

我基于@Yassine Elouafi 的解决方案创建了简短的 MIT 许可通用频道实现作为 TypeScript 语言的 redux-saga 扩展。

// redux-saga/channels.ts
import { Saga } from 'redux-saga';
import { call, fork } from 'redux-saga/effects';

export interface IChannel<TMessage> {
    take(): Promise<TMessage>;
    put(message: TMessage): void;
}

export function* takeEvery<TMessage>(channel: IChannel<TMessage>, saga: Saga) {
    while (true) {
        const message: TMessage = yield call(channel.take);
        yield fork(saga, message);
    }
}

export function createChannel<TMessage>(): IChannel<TMessage> {
    const messageQueue: TMessage[] = [];
    const resolveQueue: ((message: TMessage) => void)[] = [];

    function put(message: TMessage): void {
        if (resolveQueue.length) {
            const nextResolve = resolveQueue.shift();
            nextResolve(message);
        } else {
            messageQueue.push(message);
        }
    }

    function take(): Promise<TMessage> {
        if (messageQueue.length) {
            return Promise.resolve(messageQueue.shift());
        } else {
            return new Promise((resolve: (message: TMessage) => void) => resolveQueue.push(resolve));
        }
    }

    return {
        take,
        put
    };
}

以及类似于 redux-saga *takeEvery 构造的示例用法

// example-socket-action-binding.ts
import { put } from 'redux-saga/effects';
import {
    createChannel,
    takeEvery as takeEveryChannelMessage
} from './redux-saga/channels';

export function* socketBindActions(
    socket: SocketIOClient.Socket
) {
    const socketChannel = createSocketChannel(socket);
    yield* takeEveryChannelMessage(socketChannel, function* (action: IAction) {
        yield put(action);
    });
}

function createSocketChannel(socket: SocketIOClient.Socket) {
    const socketChannel = createChannel<IAction>();
    socket.on('action', (action: IAction) => socketChannel.put(action));
    return socketChannel;
}
于 2016-04-26T20:17:52.447 回答
1

我在使用 PouchDB 时也遇到了同样的问题,发现提供的答案非常有用和有趣。然而,在 PouchDB 中有很多方法可以做同样的事情,我挖掘了一下,发现了一种不同的方法,它可能更容易推理。

如果您不将侦听器附加到db.change请求,则它将任何更改数据直接返回给调用者,并且添加continuous: true到选项将导致发出长轮询并且在发生某些更改之前不会返回。因此,可以通过以下方式实现相同的结果

export function * monitorDbChanges() {
  var info = yield call([db, db.info]); // get reference to last change 
  let lastSeq = info.update_seq;

  while(true){
    try{
      var changes = yield call([db, db.changes], { since: lastSeq, continuous: true, include_docs: true, heartbeat: 20000 });
      if (changes){
        for(let i = 0; i < changes.results.length; i++){
          yield put({type: 'CHANGED_DOC', doc: changes.results[i].doc});
        }
        lastSeq = changes.last_seq;
      }
    }catch (error){
      yield put({type: 'monitor-changes-error', err: error})
    }
  }
}

有一件事我没有深究。如果我用then替换for循环,我会在. 我假设这与使用迭代器的一些冲突有关。change.results.forEach((change)=>{...})yield

于 2016-04-29T13:46:46.777 回答