6

扩展StreamApp要求您提供stream定义。它有一个requestShutdown参数。

def stream(args: List[String], requestShutdown: F[Unit]): Stream[F, ExitCode]

我为此提供了实现,并理解它args是作为命令行参数传入的。但是,我不确定是什么提供了requestShutdown参数以及我可以用它做什么。

具体来说,我想在Stream[IO, ExitCode]启动 Http4s 服务器(永远阻塞)上调用正常关闭。

看起来 aSignal是必需的并且必须设置?我试图“获取”的底层流如下所示:

for {
   scheduler <- Scheduler[IO](corePoolSize = 1)
   exitCode  <- BlazeBuilder[IO]
                    .bindHttp(port, "0.0.0.0")
                    .mountService(services(scheduler), "/")
                    .serve
    } yield exitCode

我的streamdef 在这里StreamAppSpec从 fs2 项目中有一些东西,StreamAppSpec但我不知道如何适应它。

4

1 回答 1

5

您可以将requestShutdown提供给流函数的参数视为一个动作,该动作在执行时将请求终止程序。

因此,执行它会导致它结束程序。

这是一个使用示例:

  override def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] =
    for {
      scheduler <- Scheduler[IO](corePoolSize = 1)
      exitStream = scheduler.sleep[IO](10 seconds)
       .evalMap(_ => requestShutdown)
       .map(_ => ExitCode.Success)
      serverStream = BlazeBuilder[IO]
        .bindHttp(port, "0.0.0.0")
        .mountService(services(scheduler), "/")
        .serve
      result <- Stream.emits(List(exitStream, serverStream)).joinUnbounded
    } yield result

在这种情况下,我们创建两个流:

  • 第一个会等待10秒,然后触发
    终止应用的效果。

  • 第二个将运行 http4s 服务器。

然后,我们加入这两个流,使它们同时运行,这意味着 Web 服务器将运行 10 秒,然后另一个流发出程序应该终止的信号。

于 2018-03-07T20:44:07.120 回答