4

我正试图绕着 Dart Streams 转转。特别是命令行实用程序的cat这个示例具有以下代码行:

 Stream<List<int>> stream = new File(path).openRead();

  // Transform the stream using a `StreamTransformer`. The transformers
  // used here convert the data to UTF8 and split string values into
  // individual lines.
  return stream
      .transform(UTF8.decoder)
      .transform(const LineSplitter())
      .listen((line) {
        if (showLineNumbers) {
          stdout.write('${lineNumber++} ');
        }
        stdout.writeln(line);
      }).asFuture().catchError((_) => _handleError(path));
  1. Stream<T>as的声明让Stream<List<int>>我有点困惑。为什么不声明为Stream<int>. List<> 类型如何使它与众不同。如果它是一个列表,订阅者事件是否以某种方式缓冲?

  2. 什么类型(如 中<T>)被传递给第一个转换?是一个int还是一个List<int>

  3. 什么类型被传递给每个下一个转换,什么决定了它们的类型。

  4. 此示例是否在将转换结果传递给下一个转换之前读取整个文件?如果是这样,是否有一个示例说明如何流式传输类似于此 Node 问题的非常大的文件Parsing huge logfiles in Node.js - read in line-by-line

4

2 回答 2

6
  1. 好问题。
  2. UTF8是一个扩展的Utf8CodecCodec<String, List<int>>。所以UTF8.decoder是一个Converter<List<int>, String>作为List<int>参数的。
  3. LineSplitter是一个Converter<String, List<String>>. 所以它String作为参数。结果流.transform(const LineSplitter())Stream<String>每行发送的地方。
  4. File.openRead在将第一个字节写入流之前不会读取整个文件。所以处理大文件没有问题。
于 2013-12-21T14:40:17.140 回答
0

Alexandre Ardhuin 的前三个问题是正确的。然而,第四个问题不是。在把它拆开并删除我自己的代码版本之后,我确定了以下内容:

即使在 37Mb 文件上,转换也只会被调用一次。

这是我用来弄清楚的代码。

import 'dart:async';
import 'dart:convert';
import 'dart:io';


void main(List<String> arguments) {

  Stream<List<int>> stream = new File('Data.txt').openRead();

   stream
      .transform(const Utf8InterceptDecoder())
        .transform(const LineSplitterIntercept())
          .listen((line) {
//            stdout.writeln(line);
          }).asFuture().catchError((_) => print(_));
}

int lineSplitCount = 0;

class LineSplitterIntercept extends LineSplitter {

  const LineSplitterIntercept() : super();
  // Never gets called
  List<String> convert(String data) {
    stdout.writeln("LineSplitterIntercept.convert : Data:" + data);
    return super.convert(data);
  }

  StringConversionSink startChunkedConversion(ChunkedConversionSink<String> sink) {
    stdout.writeln("LineSplitterIntercept.startChunkedConversion Count:"+lineSplitCount.toString()+ " Sink: " + sink.toString());
    lineSplitCount++;
    return super.startChunkedConversion(sink);
  }
}

int utfCount = 0;

class Utf8InterceptDecoder extends Utf8Decoder {

  const Utf8InterceptDecoder() : super();

  //never gets called
  String convert(List<int> codeUnits) {
    stdout.writeln("Utf8InterceptDecoder.convert : codeUnits.length:" + codeUnits.length.toString());
    return super.convert(codeUnits);
  }


  ByteConversionSink startChunkedConversion(ChunkedConversionSink<String> sink) {
    stdout.writeln("Utf8InterceptDecoder.startChunkedConversion Count:"+ utfCount.toString() + " Sink: "+ sink.toString());
    utfCount++;
    return super.startChunkedConversion(sink);
  }
}
于 2013-12-21T21:23:47.457 回答