给定以下 Dart 代码片段:
Stream stream1 = new Stream.periodic(new Duration(seconds: 1), (n) => n)
.take(10)
.asBroadcastStream();
stream1.listen((n) => print("stream1 : $n"),
onError : (err) => print("stream1 : $err"),
onDone : () => print("stream1 : done"),
cancelOnError : false);
Stream stream2 = stream1.where((n) => n % 2 == 0).take(2);
stream2.listen((n) => print("stream2 : $n"),
onError : (err) => print("stream2 : $err"),
onDone : () => print("stream2 : done"),
cancelOnError : false);
Stream stream3 = stream1.where((n) => n % 2 != 0).take(2);
stream3.listen((n) => print("stream3 : $n"),
onError : (err) => print("stream3 : $err"),
onDone : () => print("stream3 : done"),
cancelOnError : false);
StreamController controller = new StreamController.broadcast();
controller.addStream(stream2)
.then((_) => controller.addStream(stream3));
controller.stream.listen((n) => print("composite stream : $n"),
onError : (err) => print("composite stream : $err"),
onDone : () => print("composite stream : done"),
cancelOnError : false);
我得到以下输出:
stream1 : 0
stream2 : 0
composite stream : 0
stream1 : 1
stream3 : 1
composite stream : 1
stream1 : 2
stream1 : 3
stream1 : 4
stream1 : 5
stream1 : 6
stream1 : 7
stream1 : 8
stream1 : 9
stream1 : done
stream2 : done
stream3 : done
从这个输出中有几件事我不明白:
为什么每个条目只有一个
stream2
,stream3
什么时候每个条目应该有 2 个?我创建的复合流是否使用来自和StreamController
的事件之一?这种行为对我来说似乎很奇怪,我错过了什么吗?stream2
stream3
为什么
stream2
并且stream3
仅在完成时才stream1
完成?当两者都有界时,这不是我所期望的自然行为,并且与.take(10)
on的行为相矛盾stream1
。如果我删除.take(10)
onstream1
然后实际上永远不会完成stream2
。stream3
如果我修改controller
为也添加源stream1
(参见下面的片段和输出),那么当它们的 2 个元素启动时, stream2
andstream3
实际上在它们的自然位置完成,但是我也得到一个异常,因为它试图监听其中一个流两次。
StreamController controller = new StreamController.broadcast();
controller.addStream(stream1)
.then((_) => controller.addStream(stream2))
.then((_) => controller.addStream(stream3));
controller.stream.listen((n) => print("composite stream : $n"),
onError : (err) => print("composite stream : $err"),
onDone : () => print("composite stream : done"),
cancelOnError : false);
stream1 : 0
stream2 : 0
composite stream : 0
stream1 : 1
stream3 : 1
composite stream : 1
stream1 : 2
stream2 : 2
stream2 : done
composite stream : 2
stream1 : 3
stream3 : 3
stream3 : done
composite stream : 3
stream1 : 4
composite stream : 4
stream1 : 5
composite stream : 5
stream1 : 6
composite stream : 6
stream1 : 7
composite stream : 7
stream1 : 8
composite stream : 8
stream1 : 9
stream1 : done
composite stream : 9
Uncaught Error: Bad state: Stream has already been listened to.
Stack Trace:
#0 _StreamController._subscribe (dart:async/stream_controller.dart:151:7)
#1 _ControllerStream._createSubscription (dart:async/stream_controller.dart:259:157)
#2 _StreamImpl.listen (dart:async/stream_impl.dart:260:58)
#3 _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#4 _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#5 _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#6 _AsBroadcastStream.listen (dart:async/stream_impl.dart:466:37)
#7 _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#8 _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#9 _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#10 _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#11 _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#12 _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#13 _AddStreamState._AddStreamState (dart:async/stream_controller.dart:300:133)
#14 _BroadcastStreamController.addStream (dart:async/broadcast_stream_controller.dart:140:27)
#15 main.<anonymous closure> (file:///C:/SourceCode/personal/SteamTest/lib/streamtest.dart:38:38)
#16 _ThenFuture._zonedSendValue (dart:async/future_impl.dart:371:24)
#17 _TransformFuture._sendValue.<anonymous closure> (dart:async/future_impl.dart:348:48)
#18 _ZoneBase._runInZone (dart:async/zone.dart:82:17)
#19 _ZoneBase._runUnguarded (dart:async/zone.dart:102:22)
#20 _ZoneBase.executeCallback (dart:async/zone.dart:58:23)
#21 _TransformFuture._sendValue (dart:async/future_impl.dart:348:26)
#22 _FutureImpl._setValueUnchecked (dart:async/future_impl.dart:184:26)
#23 _FutureImpl._asyncSetValue.<anonymous closure> (dart:async/future_impl.dart:218:25)
#24 _asyncRunCallback (dart:async/event_loop.dart:9:15)
#25 _createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:8:13)
#26 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:95:21)
#27 _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:111:23)
#28 _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)
Unhandled exception:
Bad state: Stream has already been listened to.
#0 _DefaultZone.handleUncaughtError.<anonymous closure> (dart:async/zone.dart:146:7)
#1 _asyncRunCallback (dart:async/event_loop.dart:9:15)
#2 _asyncRunCallback (dart:async/event_loop.dart:13:7)
#3 _createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:8:13)
#4 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:95:21)
#5 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:103:7)
#6 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:103:7)
#7 _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:111:23)
#8 _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)
有人可以帮我弄清楚这里发生了什么吗?
谢谢,