0

我有以下代码,由于 Excel 最大行数限制,限制为约 100 万行:

ZStream.unwrap(generateStreamData).mapMPar(32) {m =>
  streamDataToCsvExcel
}

一切都相当简单,而且效果很好。我跟踪流式传输的行数,然后停止写入数据。但是,我想中断在 mapMPar 中生成的所有子纤维,如下所示:

ZStream.unwrap(generateStreamData).interruptWhen(effect.true).mapMPar(32) {m =>
  streamDataToCsvExcel
}

不幸的是,该过程在此处立即中断。我可能遗漏了一些明显的东西......

编辑帖子,因为它需要一些清晰度。

我的数据流是由一个昂贵的过程生成的,在这个过程中,数据是从远程服务器中提取的(这个数据本身是由一个昂贵的过程计算出来的),带有n 个Fiber。然后我处理流,然后将它们流式传输到客户端。一旦处理的行数达到约 100 万,我需要停止从远程服务器提取数据(即中断所有光纤)并结束该过程。

4

1 回答 1

0

这是您澄清后我能想到的。ZIO 1.x 版本因为缺少.dropRight

基本上takeUntilM,一旦达到最大尺寸,我们就可以用来计算我们已经停止的元素的大小(然后使用 .dropRight 或附加过滤器丢弃最后一个超过限制的元素)

这确保了两者

  • 在达到大小限制之前,您只运行streamDataToCsvExcel到最后一条可能的消息
  • 因为流是惰性的,所以expensiveQuery只会运行尽可能多的消息(如果最后一个值被丢弃,因为它会超过限制)
import zio._
import zio.stream._

object Main extends zio.App {

  override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {

    val expensiveQuery     = ZIO.succeed(Chunk(1, 2))
    val generateStreamData = ZIO.succeed(ZStream.repeatEffect(expensiveQuery))

    def streamDataToCsvExcel = ZIO.unit

    def count(ref: Ref[Int], size: Int): UIO[Boolean] =
      ref.updateAndGet(_ + size).map(_ > 10)

    for {
      counter <- Ref.make(0)
      _ <- ZStream
            .unwrap(generateStreamData)
            .takeUntilM(next => count(counter, next.size)) // Count size of messages and stop when it's reached
            .filterM(_ => counter.get.map(_ <= 10))                     // Filter last message from `takeUntilM`. Ideally should be .dropRight(1) with ZIO 2
            .mapMPar(32)(_ => streamDataToCsvExcel)
            .runDrain

    } yield ExitCode.success
  }
}

takeUntilM如果依赖流的惰性对您的用例不起作用,您可以从条件触发某种中断。例如,您可以将 count 函数更新为

def count(ref: Ref[Int], size: Int): UIO[Boolean] =
  ref.updateAndGet(_ + size).map(_ > 10)
    .tapSome { case true => someFiber.interrupt }
于 2022-02-01T13:05:22.477 回答