1

combinedLatest2在 RxDart 中使用,但我仍然对此感到困惑。这是我的代码:

final validator = StreamTransformer<String, String>.fromHandlers(
    handleData: (data, sink) =>
        data.isNotEmpty ? sink.add(data) : sink.addError('Cannot be empty.'));

final _subject1 = BehaviorSubject<String>();
final stream1 = _subject1.stream.transform(validator);
final changeSubject1 = _subject1.sink.add;

final _subject2 = BehaviorSubject<String>();
final stream2 = _subject2.stream.transform(validator);
final changeSubject2 = _subject2.sink.add;

final combined =
    Observable.combineLatest2(stream1, stream2, (a, b) => '$a, $b');

我有两个主题,其中我引用了它们的流和sink.add函数。在将流分配给它们各自的变量之前,我添加了一个转换器,以确保发出非空字符串,否则将向接收器添加错误。最后,我创建了另一个流,combined,通过结合前两个使用Observable.combineLatest2.

combined流仅在其“子”流发出有效值时才会发出。我在这里面临的问题发生在两个流已经发出有效值,然后都发出无效值,然后其中一个发出有效值时。有趣的是,对于最终的发射,combined流还发射了新更新的流的新值和另一个流的前一个有效值(尽管在前一个有效值之后已经发射了一个无效值)。我能防止这种情况发生吗?换句话说,运行这段代码:

combined.listen((data) => print(data), onError: (error) => print('error'));

changeSubject1('Hello');
changeSubject2('World');
changeSubject1('');
changeSubject2('');
changeSubject1('NewWorld');

将生成此输出:

Hello, World
error
error
NewHello, World
NewHello, NewWorld

我试图实现的输出:

Hello, World
error
error
NewHello, NewWorld

总之,我试图使combined流仅在每个流的最新值有效时才发出。

4

1 回答 1

0

combineList通过创建“错误感知”变体功能,我能够得到我想要的东西。原始函数通过使用流combineList创建新函数来工作。我创建了一个新的流类,即,它的实现几乎与. 我只添加了几行来保留每个流的错误状态,并且仅在解决所有错误时才发出。ObservableCombineLatestStreamErrorAwareCombineLatestStreamCombineLatestStream

这是我的实现:

ErrorAwareCombineLatestStream班级:

class ErrorAwareCombineLatestStream<T, A, B, C, D, E, F, G, H, I>
    extends Stream<T> {
  final StreamController<T> controller;

  ErrorAwareCombineLatestStream(Iterable<Stream<dynamic>> streams,
      T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i]))
      : controller = _buildController(streams, combiner);

  @override
  StreamSubscription<T> listen(void Function(T event) onData,
      {Function onError, void Function() onDone, bool cancelOnError}) {
    return controller.stream.listen(onData,
        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }

  static StreamController<T> _buildController<T, A, B, C, D, E, F, G, H, I>(
      Iterable<Stream<dynamic>> streams,
      T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i])) {
    final List<StreamSubscription<dynamic>> subscriptions =
        new List<StreamSubscription<dynamic>>(streams.length);
    StreamController<T> controller;

    controller = new StreamController<T>(
        sync: true,
        onListen: () {
          final List<dynamic> values = new List<dynamic>(streams.length);
          final List<bool> triggered =
              new List<bool>.generate(streams.length, (_) => false);
          final List<bool> completedStatus =
              new List<bool>.generate(streams.length, (_) => false);
          final List<bool> hasError =
              new List<bool>.generate(streams.length, (_) => false);

          for (int i = 0, len = streams.length; i < len; i++) {
            Stream<dynamic> stream = streams.elementAt(i);
            subscriptions[i] = stream.listen((dynamic value) {
              values[i] = value;
              triggered[i] = true;
              hasError[i] = false;

              final allStreamsHaveEvents =
                  triggered.reduce((bool a, bool b) => a && b) &&
                      !hasError.reduce((a, b) => a || b);

              if (allStreamsHaveEvents)
                updateWithValues(combiner, values, controller);
            }, onError: (e) {
              hasError[i] = true;
              controller.addError(e);
            }, onDone: () {
              completedStatus[i] = true;

              if (completedStatus.reduce((bool a, bool b) => a && b))
                controller.close();
            });
          }
        },
        onCancel: () => Future.wait<dynamic>(subscriptions
            .map((StreamSubscription<dynamic> subscription) =>
                subscription.cancel())
            .where((Future<dynamic> cancelFuture) => cancelFuture != null)));

    return controller;
  }

  static void updateWithValues<T, A, B, C, D, E, F, G, H, I>(
      T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i]),
      Iterable<dynamic> values,
      StreamController<T> controller) {
    try {
      final int len = values.length;
      final A a = values.elementAt(0);
      final B b = values.elementAt(1);
      T result;

      switch (len) {
        case 2:
          result = combiner(a, b);
          break;
        case 3:
          final C c = values.elementAt(2);

          result = combiner(a, b, c);
          break;
        case 4:
          final C c = values.elementAt(2);
          final D d = values.elementAt(3);

          result = combiner(a, b, c, d);
          break;
        case 5:
          final C c = values.elementAt(2);
          final D d = values.elementAt(3);
          final E e = values.elementAt(4);

          result = combiner(a, b, c, d, e);
          break;
        case 6:
          final C c = values.elementAt(2);
          final D d = values.elementAt(3);
          final E e = values.elementAt(4);
          final F f = values.elementAt(5);

          result = combiner(a, b, c, d, e, f);
          break;
        case 7:
          final C c = values.elementAt(2);
          final D d = values.elementAt(3);
          final E e = values.elementAt(4);
          final F f = values.elementAt(5);
          final G g = values.elementAt(6);

          result = combiner(a, b, c, d, e, f, g);
          break;
        case 8:
          final C c = values.elementAt(2);
          final D d = values.elementAt(3);
          final E e = values.elementAt(4);
          final F f = values.elementAt(5);
          final G g = values.elementAt(6);
          final H h = values.elementAt(7);

          result = combiner(a, b, c, d, e, f, g, h);
          break;
        case 9:
          final C c = values.elementAt(2);
          final D d = values.elementAt(3);
          final E e = values.elementAt(4);
          final F f = values.elementAt(5);
          final G g = values.elementAt(6);
          final H h = values.elementAt(7);
          final I i = values.elementAt(8);

          result = combiner(a, b, c, d, e, f, g, h, i);
          break;
      }

      controller.add(result);
    } catch (e, s) {
      controller.addError(e, s);
    }
  }
}

errorAwareCombineLatest2功能:

Observable<T> errorAwareCombineLatest2<A, B, T>(
        Stream<A> streamOne, Stream<B> streamTwo, T combiner(A a, B b)) =>
    new Observable<T>(new ErrorAwareCombineLatestStream<T, A, B, Null, Null,
            Null, Null, Null, Null, Null>(
        <Stream<dynamic>>[streamOne, streamTwo],
        (A a, B b, [Null c, Null d, Null e, Null f, Null g, Null h, Null i]) =>
            combiner(a, b)));
于 2018-12-29T11:00:47.550 回答