0

I'm trying to do websocket setup in an redux-observable epic, and i'm going with an approach similar to this guy: https://github.com/MichalZalecki/connect-rxjs-to-react/issues/1

However, it looks like my first stab at wiring things up isn't working, even though it looks the same as the guy above:

import 'rxjs';
import Observable from 'rxjs';

import * as scheduleActions from '../ducks/schedule';

export default function connectSocket(action$, store) {
  return action$.ofType(scheduleActions.CANCEL_RSVP)
    .map(action => {
      new Observable(observer => {
        // do websocket stuff here
        observer.next('message text');
      });
    })
    .map(text => {
      console.log("xxxxxxxxxxxxx: ", text);
      return scheduleActions.rsvpCancelled(1);
    });
};

However, I'm getting a Object is not a constructor error:

enter image description here

=== UPDATE ===

Looks like the suggestion to destructure the { Observable } export worked!

Not the only issue is that text doesn't seem to cross over to the next method...

import 'rxjs';
import { Observable } from 'rxjs';

import * as scheduleActions from '../ducks/schedule';

export default function connectSocket(action$, store) {
  return action$.ofType(scheduleActions.CANCEL_RSVP)
    .map(action => {
      new Observable(observer => {
        // do websocket stuff here
        observer.next('message text');
      });
    })
    .map(text => {
      console.log("xxxxxxxxxxxxx: ", text); // prints undefined
      return scheduleActions.rsvpCancelled(1);
    });
};
4

2 回答 2

3

在 RxJS v5 中,Observable该类可用作命名导出,而不是默认导出。

import { Observable } from 'rxjs';

从常规导入rxjs也将导入所有RxJS(将所有运算符添加到 Observable 原型)。这在此处的文档中进行了描述。如果您希望更明确并且只导入Observable自身,您可以直接在以下位置导入它rxjs/Observable

import { Observable } from 'rxjs/Observable';

另外,您在映射自定义 Observable 的方式上有几个问题。

首要问题

你实际上并没有退回它。呵呵。您缺少 return 语句(或者您可以删除花括号并使用箭头函数隐式返回)。

第二期

当您返回 Observable 时,常规.map()运算符不会做任何特别的事情。如果您希望自定义 Observable 被订阅和展平,您将需要使用执行某种展平的运算符。

最常见的两个是mergeMap(aka flatMap) 或switchMap.

action$.ofType(scheduleActions.CANCEL_RSVP)
  .mergeMap(action => {
    return new Observable(observer => {
      // do websocket stuff here
      observer.next('message text');
    });
  })

您需要哪个运算符取决于您想要的行为。如果您还不熟悉,可以查看有关各种运算符的文档或直接跳转到mergeMapswitchMap文档。


如果您喜欢冒险,RxJS v5 确实具有开箱即用的 WebSocket 支持,您可以尝试使用Observable.webSocket(). 它没有很好地记录,但您也可以查看单元测试,对于简单的只读单向流,它非常自我解释——提供 URL 并订阅。它实际上非常强大,如果你能弄清楚如何使用它,那就是。通过单个套接字支持双向、多路复用,也就是复杂的多个输入/输出通道。我们在 Netflix 将它用于具有数千个 rps 的几个内部工具。

于 2017-03-21T06:52:06.020 回答
0

你可以看看Demo。访问Create Custom Observable

于 2019-05-31T03:57:47.330 回答