2

我刚刚遇到了一个问题,即使用要写入文件的字符串流来降低 fs2 性能text.utf8encode。我尝试更改我的源以使用分块字符串来提高性能,但观察到的是性能下降。

据我所知,它归结为以下几点: 调用flatMap源自 from 的流Stream.emits()可能非常昂贵。根据传递给 Stream.emits() 的序列大小,时间使用似乎是指数级的。下面的代码片段显示了一个示例:

/*
    Test done with scala 2.11.11 and fs2 version 0.10.0-M7.
 */

val rangeSize = 20000
val integers = (1 to rangeSize).toVector

// Note that the last flatMaps are just added to show extreme load for streamA.
val streamA = Stream.emits(integers).flatMap(Stream.emit(_))
val streamB = Stream.range(1, rangeSize + 1).flatMap(Stream.emit(_))

streamA.toVector  // Uses approx. 25 seconds (!)
streamB.toVector  // Uses approx. 15 milliseconds

这是一个错误,还是应该避免将 Stream.emits() 用于大序列?

4

1 回答 1

1

TLDR:分配。

更长的答案:

有趣的问题。我分别对这两种方法运行了 JFR 配置文件,并查看了结果。立即吸引我眼球的第一件事是分配的数量。

Stream.emit

Stream.emit 分配

Stream.range

Stream.range 分配

我们可以看到它Stream.emit分配了大量的Append实例,它们是 的具体实现Catenable[A],它是用于Stream.emit折叠的类型:

private[fs2] final case class Append[A](left: Catenable[A], right: Catenable[A]) extends Catenable[A]

这实际上来自于如何实现的Catenable[A]实现foldLeft

foldLeft(empty: Catenable[B])((acc, a) => acc :+ f(a))

where为每个元素:+分配一个新对象。Append这意味着我们至少要生成 20000 个这样的Append对象。

文档中还有Stream.range关于它如何生成单个块而不是进一步划分流的提示,如果这是我们生成的大范围,这可能会很糟糕:

/**
 * Lazily produce the range `[start, stopExclusive)`. If you want to produce
 * the sequence in one chunk, instead of lazily, use
 * `emits(start until stopExclusive)`.
 *
 * @example {{{
 * scala> Stream.range(10, 20, 2).toList
 * res0: List[Int] = List(10, 12, 14, 16, 18)
 * }}}
 */
def range(start: Int, stopExclusive: Int, by: Int = 1): Stream[Pure,Int] =
  unfold(start){i =>
    if ((by > 0 && i < stopExclusive && start < stopExclusive) ||
        (by < 0 && i > stopExclusive && start > stopExclusive))
      Some((i, i + by))
    else None
}

您可以看到这里没有额外的包装,只有作为范围的一部分发出的整数。另一方面,为序列中的每个元素Stream.emits创建一个对象,其中我们有一个包含流尾部的对象,并包含我们所在的当前值。Appendleftright

这是一个错误吗?我会说不,但我肯定会将其作为性能问题向fs2库维护者开放。

于 2017-10-25T12:00:51.653 回答