0

我正在尝试扩展StreamConsumer,这就是我实现它的方式:

import 'dart:async';

class Consumer extends StreamConsumer {
  List list = new List();
  StreamSubscription subscription;

  Future addStream(Stream stream) {
    Completer c = new Completer();
    subscription = stream.listen((d) => list.add(d),
        onError: c.completeError,
        onDone: () => c.complete(this));
    return c.future;
  }

  Future close() {
    subscription.cancel();
    return new Future.value(this);
  }
}

void main() {
  List data = [1,2,3,4,5];
  Stream stream = new Stream.fromIterable(data);
  Consumer streamConsumer = new Consumer();
  stream.pipe(streamConsumer).then((Consumer consumer) {
    print(consumer.list);
  });
}

正如预期的那样,它会打印,[1,2,3,4,5]但如果我将实现更改为Stream如下所示:

StreamController controller = new StreamController();
Stream stream = controller.stream;
controller.add(1);
controller.add(2);
controller.add(3);
stream.pipe(streamConsumer).then((Consumer consumer) {
  print(consumer.list);
});
controller.add(4);
controller.add(5);

它什么也不打印。可以得出结论,onDone根本没有调用回调(我认为)。那么有什么想法吗?

4

1 回答 1

2

你需要打电话StreamController.close()

new Stream.fromIterable(data)基本上创建一个StreamController,一次添加一个可迭代的元素,并在迭代完成时关闭控制器。在您的第二个代码块中,您手动执行此操作,但不关闭控制器。

以下是相关文档:http ://api.dartlang.org/docs/releases/latest/dart_async/StreamController.html#close

于 2013-07-27T17:51:32.160 回答