combineList
通过创建“错误感知”变体功能,我能够得到我想要的东西。原始函数通过使用流combineList
创建新函数来工作。我创建了一个新的流类,即,它的实现几乎与. 我只添加了几行来保留每个流的错误状态,并且仅在解决所有错误时才发出。Observable
CombineLatestStream
ErrorAwareCombineLatestStream
CombineLatestStream
这是我的实现:
ErrorAwareCombineLatestStream
班级:
class ErrorAwareCombineLatestStream<T, A, B, C, D, E, F, G, H, I>
extends Stream<T> {
final StreamController<T> controller;
ErrorAwareCombineLatestStream(Iterable<Stream<dynamic>> streams,
T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i]))
: controller = _buildController(streams, combiner);
@override
StreamSubscription<T> listen(void Function(T event) onData,
{Function onError, void Function() onDone, bool cancelOnError}) {
return controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
static StreamController<T> _buildController<T, A, B, C, D, E, F, G, H, I>(
Iterable<Stream<dynamic>> streams,
T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i])) {
final List<StreamSubscription<dynamic>> subscriptions =
new List<StreamSubscription<dynamic>>(streams.length);
StreamController<T> controller;
controller = new StreamController<T>(
sync: true,
onListen: () {
final List<dynamic> values = new List<dynamic>(streams.length);
final List<bool> triggered =
new List<bool>.generate(streams.length, (_) => false);
final List<bool> completedStatus =
new List<bool>.generate(streams.length, (_) => false);
final List<bool> hasError =
new List<bool>.generate(streams.length, (_) => false);
for (int i = 0, len = streams.length; i < len; i++) {
Stream<dynamic> stream = streams.elementAt(i);
subscriptions[i] = stream.listen((dynamic value) {
values[i] = value;
triggered[i] = true;
hasError[i] = false;
final allStreamsHaveEvents =
triggered.reduce((bool a, bool b) => a && b) &&
!hasError.reduce((a, b) => a || b);
if (allStreamsHaveEvents)
updateWithValues(combiner, values, controller);
}, onError: (e) {
hasError[i] = true;
controller.addError(e);
}, onDone: () {
completedStatus[i] = true;
if (completedStatus.reduce((bool a, bool b) => a && b))
controller.close();
});
}
},
onCancel: () => Future.wait<dynamic>(subscriptions
.map((StreamSubscription<dynamic> subscription) =>
subscription.cancel())
.where((Future<dynamic> cancelFuture) => cancelFuture != null)));
return controller;
}
static void updateWithValues<T, A, B, C, D, E, F, G, H, I>(
T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i]),
Iterable<dynamic> values,
StreamController<T> controller) {
try {
final int len = values.length;
final A a = values.elementAt(0);
final B b = values.elementAt(1);
T result;
switch (len) {
case 2:
result = combiner(a, b);
break;
case 3:
final C c = values.elementAt(2);
result = combiner(a, b, c);
break;
case 4:
final C c = values.elementAt(2);
final D d = values.elementAt(3);
result = combiner(a, b, c, d);
break;
case 5:
final C c = values.elementAt(2);
final D d = values.elementAt(3);
final E e = values.elementAt(4);
result = combiner(a, b, c, d, e);
break;
case 6:
final C c = values.elementAt(2);
final D d = values.elementAt(3);
final E e = values.elementAt(4);
final F f = values.elementAt(5);
result = combiner(a, b, c, d, e, f);
break;
case 7:
final C c = values.elementAt(2);
final D d = values.elementAt(3);
final E e = values.elementAt(4);
final F f = values.elementAt(5);
final G g = values.elementAt(6);
result = combiner(a, b, c, d, e, f, g);
break;
case 8:
final C c = values.elementAt(2);
final D d = values.elementAt(3);
final E e = values.elementAt(4);
final F f = values.elementAt(5);
final G g = values.elementAt(6);
final H h = values.elementAt(7);
result = combiner(a, b, c, d, e, f, g, h);
break;
case 9:
final C c = values.elementAt(2);
final D d = values.elementAt(3);
final E e = values.elementAt(4);
final F f = values.elementAt(5);
final G g = values.elementAt(6);
final H h = values.elementAt(7);
final I i = values.elementAt(8);
result = combiner(a, b, c, d, e, f, g, h, i);
break;
}
controller.add(result);
} catch (e, s) {
controller.addError(e, s);
}
}
}
errorAwareCombineLatest2
功能:
Observable<T> errorAwareCombineLatest2<A, B, T>(
Stream<A> streamOne, Stream<B> streamTwo, T combiner(A a, B b)) =>
new Observable<T>(new ErrorAwareCombineLatestStream<T, A, B, Null, Null,
Null, Null, Null, Null, Null>(
<Stream<dynamic>>[streamOne, streamTwo],
(A a, B b, [Null c, Null d, Null e, Null f, Null g, Null h, Null i]) =>
combiner(a, b)));