1

我想将 Monix Observable 与 Doobie (fs2) 流一起使用,但似乎无法使其正常工作。如果没有流式传输,我的测试应用程序可以正常退出,但是在使用流式传输后,我的 TaskApp 似乎挂在关机状态并且无法弄清楚原因。

这是重现问题的最小示例:

package example

import java.util.concurrent.Executors

import doobie.implicits._
import cats.effect.{Blocker, ContextShift, ExitCode, Resource}
import doobie.hikari.HikariTransactor
import monix.eval.{Task, TaskApp}
import com.typesafe.scalalogging.StrictLogging
import fs2.interop.reactivestreams._
import monix.reactive.Observable

import scala.concurrent.ExecutionContext
 
object Hello extends TaskApp with StrictLogging {
    
  private def resources()(implicit contextShift: ContextShift[Task]): Resource[Task, Resources] = {
    for {
      transactor <- Database.transactor("org.postgresql.Driver", "jdbc:postgresql://localhost/fubar", "fubar", "fubar")
    } yield Resources(transactor)
  }

  def run(args: List[String]): Task[ExitCode] = resources().use(task)
    .flatMap(_ => Task { println("All Done!") })
    .flatMap(_ => Task(ExitCode.Success))
  
  def task(resources: Resources): Task[Unit] = {

    val publisher =
      
      sql"""select id from message;"""

        .query[(Long)]
        .stream
        .transact(resources.transactor)
        .toUnicastPublisher()

    Observable.fromReactivePublisher(publisher)
      .foreachL(id => logger.info(id.toString))
    
  }
  
}

case class Resources(transactor: HikariTransactor[Task])

object Database {

  val ecBlocking = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))

  def transactor(dbDriver: String, dbUrl: String, dbUser: String, dbPassword: String)(implicit contextShift: ContextShift[Task]): Resource[Task, HikariTransactor[Task]] = {
    HikariTransactor.newHikariTransactor[Task](dbDriver, dbUrl, dbUser, dbPassword, ecBlocking, Blocker.liftExecutionContext(ecBlocking))
  }

}

我已根据 Monix 文档将 fs2 流转换为 Monix observable:https ://monix.io/docs/current/reactive/observable.html#fs2

我是否需要以某种方式关闭 fs2 流或 Observable 才能干净地退出应用程序?感谢任何提示以使其正常工作或提示如何正确调试它。

4

1 回答 1

0

问题是ExecutionContext需要关闭。在此处查看作者的答案。

在文档中可以看到正确的用法。

于 2020-08-23T15:46:10.380 回答