0

创建需要从另一个 Stream 聚合多个事件的流的最佳方法是什么?

我的目标是创建一个流来聚合来自另一个流的事件,直到它有足够的事件来构建消息。在我的情况下,我正在从 Socket 流中读取数据,因此消息可能分布在不同的事件中,并且事件可能包含各种消息的数据,因此我不能只对每个元素应用映射操作。

似乎正确的方法是使用 Stream Transformer,但我无法找到有关如何正确实现它且没有太多样板代码的信息。

在阅读了有关如何创建流的信息后,我想出了一个解决方案,但我不确定这是否可以接受,也不是最好的方法。

这是我的解决方案示例:

Stream<String> joinWordsIfStartWithC(Stream<String> a) async* {
  var prevWord= '';
  await for (var i in a) {
    prevWord += i;
    if(i.startsWith('C')){
      yield prevWord;
      prevWord = '';
    }
  }
}

Stream<String> periodicStream(Duration interval) async* {
  while (true) {
    await Future.delayed(interval);
    yield 'C';
    yield 'A';
    yield 'B';
    yield 'C';
    yield 'C';
    yield 'B';
    yield 'C';
  }
}

void main(List<String> arguments) async {
  var intStream = periodicStream(Duration(seconds: 2));

  var sStream = joinWordsIfStartWithC(intStream);

  sStream.listen((s) => print(s));
}
4

1 回答 1

0

我会说你的解决方案似乎很好,但如果你想制作一个流转换器,通过扩展从StreamTransformerBase

import 'dart:async';

class JoinWordsIfStartWithCTransformer extends StreamTransformerBase<String, String> {
  Stream<String> bind(Stream<String> a) async* {
    var prevWord = '';
    await for (var i in a) {
      prevWord += i;
      if (i.startsWith('C')) {
        yield prevWord;
        prevWord = '';
      }
    }
  }
}

Stream<String> periodicStream(Duration interval) async* {
  while (true) {
    await Future.delayed(interval);
    yield 'C';
    yield 'A';
    yield 'B';
    yield 'C';
    yield 'C';
    yield 'B';
    yield 'C';
  }
}

void main(List<String> arguments) async {
  var intStream = periodicStream(Duration(seconds: 2));

  var sStream = intStream.transform(JoinWordsIfStartWithCTransformer());

  sStream.listen((s) => print(s));
}
于 2020-04-21T12:58:17.750 回答