1

我需要将 rn-fetch-blob 的onData回调方法连接到 observable。

据我所知,这不是一个事件,无法使用fromEventPattern 。如果这是我的问题的解决方案,我看不到如何使用create 。

我发现bindCallback看起来很有希望,但是文档说我可能应该使用 fromEvent 代替:

请注意,由输出函数创建的 Observable 将始终发出单个值,然后立即完成。如果 func 多次调用回调,后续调用的值将不会出现在流中。如果您需要监听多个调用,您可能希望使用 fromEvent 或 fromEventPattern 来代替。

我确实需要听多个电话。

无论如何,我正在尝试在文档中显示的对象方法上使用 bindCallback。在我的打字稿文件中:

import { bindCallback } from 'rxjs';

在我的课堂上:

private emitter!: Observable<any>;

在私有方法中:

RNFetchBlob.fs
    .readStream(
      filePath,
      "utf8",
      -1,
      10
    )
    .then(ifstream => {
      ifstream.open();
      this.emitter = bindCallback(ifstream.onData);

但它无法编译:

error TS2322: Type '() => Observable<string | number[]>' is not assignable to type 'Observable<any>'.
  Property '_isScalar' is missing in type '() => Observable<string | number[]>'.

在我的情况下,我真的看不到如何使用 fromEvent 。

任何帮助表示赞赏。

编辑:为那些寻找答案的人添加了工作代码:

RNFetchBlob.fs
            .readStream(
              // file path
              peripheral.name,
              // encoding, should be one of `base64`, `utf8`, `ascii`
              "utf8",
              // (optional) buffer size, default to 4096 (4095 for BASE64 encoded data)
              // when reading file in BASE64 encoding, buffer size must be multiples of 3.
              -1,
              10
            )
            .then(ifstream => {
              ifstream.open();
              this.emitter = new Observable(subscriber => {
                ifstream.onData(chunk => {
                  // chunk will be a string when encoding is 'utf8'
                  logging.logWithTimestamp(`Received [${chunk}]`);
                  subscriber.next(chunk);
                });
                ifstream.onError(err => {
                  logging.logWithTimestamp(`oops [${err}]`);
                  subscriber.error(err);
                });

                ifstream.onEnd(() => {
                  subscriber.complete();
                });
              });
              this.rxSubscription = this.emitter
                .pipe(
                  concatMap(value =>
                    this.handleUpdatedValuesComingFromCSVFile(value)
                  )
                )
                .subscribe();
4

2 回答 2

1

不太熟悉,rn-fetch-blob但希望你明白,你也返回一个函数来运行清理逻辑。

 const onDataObserevable=new Obserevable(obs=>{
    ifstream.onData(data=>obs.next(data))
    return ()=>{... you can add some clean up logic here like unsubcribe from source onData event}
 });

更新

将整个链转换为可观察的,希望你明白

from(RNFetchBlob.fs.readStream(
              peripheral.name,
              "utf8",
              -1,
              10
            ))
            .pipe(mergeMap(ifstream => {
              ifstream.open();
              return new Observable(subscriber => {
                ifstream.onData(chunk => {
                  // chunk will be a string when encoding is 'utf8'
                  logging.logWithTimestamp(`Received [${chunk}]`);
                  subscriber.next(chunk);
                });

                ifstream.onError(err => {
                  logging.logWithTimestamp(`oops [${err}]`);
                  subscriber.error(err);
                });

                ifstream.onEnd(() => {
                  subscriber.complete();
                });

              });),
              mergeMap(value =>this.handleUpdatedValuesComingFromCSVFile(value)
                  )
              )
于 2019-11-19T14:04:18.080 回答
0

以下是如何做到这一点fromEventPattern

this.emitter = fromEventPattern(
  // tell fromEventPattern how to subscribe to the data
  handler => ifstream.onData(handler),
  // tell fromEventPattern how to unsubscribe from the data
  handler => ifstream.onData(null)
)
于 2019-11-19T13:52:29.593 回答