1

我想要实现的目标,例如,给定数据:

time, part, data
0, a, 3
1, a, 4
2, b, 10
3, b, 20
3, a, 5

和转型:

stream.keyBy(_.part).scan(0)((s, d) => s + d)

得到:

0, a, 3
1, a, 7
2, b, 10
3, b, 30
3, a, 12

我已经尝试使用 对它进行分区groupAdjacentBy,但它变得太复杂了,因为我需要使用 Key 保留每个 Chunk 之间的复杂状态。我想知道是否有类似 Flink DataStream 的东西。关键?或者更简单的实现方式?

4

3 回答 3

1

好的,我找到了有趣的解决方案(虽然不能flatten

于 2018-12-24T14:39:42.603 回答
0

如前所述,该问题可以通过扫描操作本身的“分区”来解决:

import cats.implicits._
import cats.effect.IO
import fs2._

case class Element(time: Long, part: Symbol, value: Int)

val elements = Stream(
  Element(0, 'a, 3),
  Element(1, 'a, 4),
  Element(2, 'b, 10),
  Element(3, 'b, 20),
  Element(3, 'a, 5)
)

val runningSumsByPart = elements
  .scan(Map.empty[Symbol, Int] -> none[Element]) {
    case ((sums, _), el@Element(_, part, value)) =>
      val sum = sums.getOrElse(part, 0) + value
      (sums + (part -> sum), el.copy(value = sum).some)
  }
  .collect { case (_, Some(el)) => el }

runningSumsByPart.covary[IO].evalTap(el => IO { println(el) }).compile.drain.unsafeRunSync()

输出:

元素(0,'a,3)

元素(1,'a,7)

元素(2,'b,10)

元素(3,'b,30)

元素(3,'a,12)

于 2019-02-02T04:52:21.380 回答
0

我做了这样的事情。先拆分,再合并。我还不知道如何返回 2 个流。我只知道如何在一个地方处理它们,然后将它们合并在一起。

    val notEqualS = in
      .filter(_.isInstanceOf[NotEqual])
      .map(_.asInstanceOf[NotEqual])
      ...

    val invalidS = in
      .filter(_.isInstanceOf[Invalid])
      .map(_.asInstanceOf[Invalid])
      ...

    notEqualS.merge(invalidS)
于 2020-12-11T00:21:43.640 回答