这是您澄清后我能想到的。ZIO 1.x 版本因为缺少.dropRight
基本上takeUntilM
,一旦达到最大尺寸,我们就可以用来计算我们已经停止的元素的大小(然后使用 .dropRight 或附加过滤器丢弃最后一个超过限制的元素)
这确保了两者
- 在达到大小限制之前,您只运行
streamDataToCsvExcel
到最后一条可能的消息
- 因为流是惰性的,所以
expensiveQuery
只会运行尽可能多的消息(如果最后一个值被丢弃,因为它会超过限制)
import zio._
import zio.stream._
object Main extends zio.App {
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
val expensiveQuery = ZIO.succeed(Chunk(1, 2))
val generateStreamData = ZIO.succeed(ZStream.repeatEffect(expensiveQuery))
def streamDataToCsvExcel = ZIO.unit
def count(ref: Ref[Int], size: Int): UIO[Boolean] =
ref.updateAndGet(_ + size).map(_ > 10)
for {
counter <- Ref.make(0)
_ <- ZStream
.unwrap(generateStreamData)
.takeUntilM(next => count(counter, next.size)) // Count size of messages and stop when it's reached
.filterM(_ => counter.get.map(_ <= 10)) // Filter last message from `takeUntilM`. Ideally should be .dropRight(1) with ZIO 2
.mapMPar(32)(_ => streamDataToCsvExcel)
.runDrain
} yield ExitCode.success
}
}
takeUntilM
如果依赖流的惰性对您的用例不起作用,您可以从条件触发某种中断。例如,您可以将 count 函数更新为
def count(ref: Ref[Int], size: Int): UIO[Boolean] =
ref.updateAndGet(_ + size).map(_ > 10)
.tapSome { case true => someFiber.interrupt }