4

给定以下 Dart 代码片段:

Stream stream1 = new Stream.periodic(new Duration(seconds: 1), (n) => n)
                           .take(10)
                           .asBroadcastStream();
stream1.listen((n) => print("stream1 : $n"),
               onError : (err) => print("stream1 : $err"),
               onDone : () => print("stream1 : done"),
               cancelOnError : false);

Stream stream2 = stream1.where((n) => n % 2 == 0).take(2);
stream2.listen((n) => print("stream2 : $n"),
               onError : (err) => print("stream2 : $err"),
               onDone : () => print("stream2 : done"),
               cancelOnError : false);

Stream stream3 = stream1.where((n) => n % 2 != 0).take(2);
stream3.listen((n) => print("stream3 : $n"),
               onError : (err) => print("stream3 : $err"),
               onDone : () => print("stream3 : done"),
               cancelOnError : false);

StreamController controller = new StreamController.broadcast();
controller.addStream(stream2)
  .then((_) => controller.addStream(stream3));
controller.stream.listen((n) => print("composite stream : $n"),
                         onError : (err) => print("composite stream : $err"),
                         onDone : () => print("composite stream : done"),
                         cancelOnError : false);

我得到以下输出:

stream1 : 0
stream2 : 0
composite stream : 0
stream1 : 1
stream3 : 1
composite stream : 1
stream1 : 2
stream1 : 3
stream1 : 4
stream1 : 5
stream1 : 6
stream1 : 7
stream1 : 8
stream1 : 9
stream1 : done
stream2 : done
stream3 : done

从这个输出中有几件事我不明白:

  1. 为什么每个条目只有一个stream2stream3什么时候每个条目应该有 2 个?我创建的复合流是否使用来自和StreamController的事件之一?这种行为对我来说似乎很奇怪,我错过了什么吗?stream2stream3

  2. 为什么stream2并且stream3仅在完成时才stream1完成?当两者都有界时,这不是我所期望的自然行为,并且与.take(10)on的行为相矛盾stream1。如果我删除.take(10)onstream1然后实际上永远不会完成stream2stream3

如果我修改controller为也添加源stream1(参见下面的片段和输出),那么当它们的 2 个元素启动时, stream2andstream3实际上在它们的自然位置完成,但是我也得到一个异常,因为它试图监听其中一个流两次。

StreamController controller = new StreamController.broadcast();
controller.addStream(stream1)
  .then((_) => controller.addStream(stream2))
  .then((_) => controller.addStream(stream3));
controller.stream.listen((n) => print("composite stream : $n"),
                         onError : (err) => print("composite stream : $err"),
                         onDone : () => print("composite stream : done"),
                         cancelOnError : false);

stream1 : 0
stream2 : 0
composite stream : 0
stream1 : 1
stream3 : 1
composite stream : 1
stream1 : 2
stream2 : 2
stream2 : done
composite stream : 2
stream1 : 3
stream3 : 3
stream3 : done
composite stream : 3
stream1 : 4
composite stream : 4
stream1 : 5
composite stream : 5
stream1 : 6
composite stream : 6
stream1 : 7
composite stream : 7
stream1 : 8
composite stream : 8
stream1 : 9
stream1 : done
composite stream : 9
Uncaught Error: Bad state: Stream has already been listened to.
Stack Trace:
#0      _StreamController._subscribe (dart:async/stream_controller.dart:151:7)
#1      _ControllerStream._createSubscription (dart:async/stream_controller.dart:259:157)
#2      _StreamImpl.listen (dart:async/stream_impl.dart:260:58)
#3      _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#4      _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#5      _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#6      _AsBroadcastStream.listen (dart:async/stream_impl.dart:466:37)
#7      _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#8      _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#9      _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#10     _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#11     _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#12     _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#13     _AddStreamState._AddStreamState (dart:async/stream_controller.dart:300:133)
#14     _BroadcastStreamController.addStream (dart:async/broadcast_stream_controller.dart:140:27)
#15     main.<anonymous closure> (file:///C:/SourceCode/personal/SteamTest/lib/streamtest.dart:38:38)
#16     _ThenFuture._zonedSendValue (dart:async/future_impl.dart:371:24)
#17     _TransformFuture._sendValue.<anonymous closure> (dart:async/future_impl.dart:348:48)
#18     _ZoneBase._runInZone (dart:async/zone.dart:82:17)
#19     _ZoneBase._runUnguarded (dart:async/zone.dart:102:22)
#20     _ZoneBase.executeCallback (dart:async/zone.dart:58:23)
#21     _TransformFuture._sendValue (dart:async/future_impl.dart:348:26)
#22     _FutureImpl._setValueUnchecked (dart:async/future_impl.dart:184:26)
#23     _FutureImpl._asyncSetValue.<anonymous closure> (dart:async/future_impl.dart:218:25)
#24     _asyncRunCallback (dart:async/event_loop.dart:9:15)
#25     _createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:8:13)
#26     _Timer._createTimerHandler._handleTimeout (timer_impl.dart:95:21)
#27     _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:111:23)
#28     _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)


Unhandled exception:
Bad state: Stream has already been listened to.
#0      _DefaultZone.handleUncaughtError.<anonymous closure> (dart:async/zone.dart:146:7)
#1      _asyncRunCallback (dart:async/event_loop.dart:9:15)
#2      _asyncRunCallback (dart:async/event_loop.dart:13:7)
#3      _createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:8:13)
#4      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:95:21)
#5      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:103:7)
#6      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:103:7)
#7      _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:111:23)
#8      _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)

有人可以帮我弄清楚这里发生了什么吗?

谢谢,

4

1 回答 1

3

在为您的示例调试飞镖代码后,在我看来这是一个错误。

take 调用会生成一个 Take-stream,该 Take-stream 对 Where-stream 的引用具有对 broadcastStream 流 1 的引用。顺便说一句,因为 stream1 是一个广播流,所以这也适用于 Where- 和 Take-stream(在调试期间检查)。

事件数的计数器是 Take-stream 的状态变量,但每次订阅 Take-Stream(通过调用 listen)时,都会创建对 Where-stream 的订阅,进而创建对 stream1 的订阅. 在这种情况下,您有 2 个对 stream2 的订阅(1 个通过监听,1 个通过控制器)导致对 stream1 的两个 2 个订阅(因此由于 stream3 总共有 4 个,但这对于讨论并不重要)。

因此,当 stream1 触发事件 0 时,它通过 stream2 中的 Take-stream 两次,减少 Take-stream 的计数器两次,并导致 onDone-event 仅传递到第二个订阅。因为计数器减 2 并且它是 Take-stream 的状态变量,所以当 stream1 触发事件 2 时,Take-Stream 将不再触发。

所以这对我来说似乎是一个错误。两个观察。

  1. 接收流计数器是否应该是接收流订阅中的状态变量,而不是接收流本身?

  2. 每次订阅 Take-Stream 是否都会导致订阅源流?(另见 Dart 代码中的 ForwardingStream)。这可能取决于广播上下文。

也许他们应该进一步查看 Rx 代码。

于 2014-01-14T01:04:55.393 回答