8

我想使用类似于take(n: Int)但在时间维度上的函数: consume(period: Duration. 因此,如果发生超时,我希望流终止。我知道我可以将流编译成类似的东西IO[List[T]]并取消它,但是我会丢失结果。实际上,我想将无尽的流转换为有限的流并保留结果。

更多关于更广泛的问题。我有来自消息传递代理的无穷无尽的事件流,但我也有轮换凭据来连接到代理。所以我想要的是消耗事件流一段时间,然后停止,获取新凭据,再次连接到创建新流的代理并将两个流连接为一个。

4

2 回答 2

5

有一种方法可以做到这一点:

/**
    * Interrupts this stream after the specified duration has passed.
    */
  def interruptAfter[F2[x] >: F[x]: Concurrent: Timer](duration: FiniteDuration): Stream[F2, O]
于 2019-05-16T21:16:22.033 回答
2

你需要这样的东西

import scala.util.Random
import scala.concurrent.ExecutionContext
import fs2._
import fs2.concurrent.SignallingRef
implicit val ex = ExecutionContext.global
implicit val t: Timer[IO] = IO.timer(ex)
implicit val cs: ContextShift[IO] = IO.contextShift(ex)

val effect: IO[Long] = IO.sleep(1.second).flatMap(_ => IO{
  val next = Random.nextLong()
  println("NEXT: " + next)
  next
})
val signal = SignallingRef[IO, Boolean](false).unsafeRunSync()
val timer = Stream.sleep(10.seconds).flatMap(_ => 
  Stream.eval(signal.set(true)).flatMap(_ => 
    Stream.emit(println("Finish")).covary[IO]))

val stream = timer concurrently 
Stream.repeatEval(effect).interruptWhen(signal)

stream.compile.drain.unsafeRunSync()

此外,如果您想保存发布数据的结果,您需要从 fs2 获得一些无界队列,以便通过 queue.stream 将发布的数据转换为您的结果

于 2019-05-16T09:44:49.990 回答