0

嗨,我正在尝试将 redux-observables 与 react native 和一个名为 Phoenix 的 websocket 包装器一起使用,它基本上允许您在通过 websocket 接收到某些消息时执行回调。

这是我正在尝试做的基本设置:

import 'rxjs';
import { Observable } from 'rxjs';
import { Socket, Channel } from 'phoenix';

import * as channelActions from '../ducks/channel';

export default function connectSocket(action$, store) {
  return action$.ofType(channelActions.SETUP)
    .mergeMap(action => {
      return new Observable(observer => {
        const socket = new Socket('http://localhost:4000/socket');

        const userId = store.getState().user.id;
        const channel = socket.channel(`user:${userId}`);

        channel
          .join()
          .receive('ok', response => observer.next({ type: 'join', payload: 'something' }))
          .receive('error', reason => observer.next({ type: 'error', payload: 'reason' }))

        channel.on('rooms:add', room => observer.next({ type: 'rooms:add', payload: '123' }))
        channel.on('something:else', room => observer.next({ type: 'something:else', payload: '123' }))
      });
    })
    .map(action => {
      switch (action.type) {
        case 'join':
          return channelActions.join(action.payload);
        case 'error':
          return channelActions.error(action.payload);
        case 'rooms:add':
          return channelActions.add(action.payload);
        case 'something:else':
          return channelActions.something(action.payload);
      }
    });
};

如您所见,这种方法有几个问题/问题:

  1. 一位观察者正在触发不同的事件。.map()除了函数中的 switch 语句之外,我不知道如何将它们拆分

  2. 我不知道在哪里打电话,observer.complete()因为我希望它继续监听所有这些事件,而不仅仅是一次。

  3. 如果我可以将常量提取channel到可以解决这些问题的几个史诗的单独文件中。但是,channel是依赖于socket,哪个是依赖于user,来自于redux状态。

所以我对如何解决这个问题感到很困惑。我认为提取全局channel对象的能力可以解决它,但这也取决于来自 redux 状态的用户 ID。任何帮助表示赞赏。

PS 如果值得的话,我的用例与这个人非常相似(https://github.com/MichalZalecki/connect-rxjs-to-react/issues/1)。其中一位响应者建议使用Rx.Subject,但我不知道从哪里开始......

4

1 回答 1

1

看起来你让你的生活变得更加困难。您已经映射了事件,那么为什么要组合和重新映射它们?将每个事件映射到其自己的流中并将结果合并在一起。

// A little helper function to map your nonstandard receive
// function into its own stream
const receivePattern = (channel, signal, selector) => 
  Observable.fromEventPattern(
    h => channel.receive(signal, h),
    h => {/*However you unsubscribe*/},
    selector
  )

export default function connectSocket(action$, store) {
  return action$.ofType(channelActions.SETUP)
    // Observable.create is usually a crutch pattern
    // Use defer or using here instead as it is more semantic and handles
    // most of the stream lifecycle for you
    .mergeMap(action => Observable.defer(() => {
        const socket = new Socket('http://localhost:4000/socket');
        const userId = store.getState().user.id;
        const channel = socket.channel(`user:${userId}`);
        const joinedChannel = channel.join();

        // Convert the ok message into a channel join
        const ok = receivePattern(joinedChannel, 'ok')
          // Just an example of doing some arbitrary mapping since you seem to be doing it 
          // in your example
          .mapTo('something')
          .map(channelActions.join);

        // Convert the error message
        const error = receivePattern(joinedChannel, 'error')
          .map(channelActions.error);

        // Since the listener methods follow a standard event emitter interface
        // You can use fromEvent to capture them
        // Rather than the more verbose fromEventPattern we used above
        const addRooms = Observable.fromEvent(channel, 'rooms:add')
          .map(channelActions.add);

        const somethingElse = Observable.fromEvent(channel, 'something:else')
          .map(channelActions.somethingElse);

        // Merge the resulting stream into one
        return Observable.merge(ok, error, addRooms, somethingElse);
      });
  );
};
于 2017-03-22T16:52:25.010 回答