我想将 Monix Observable 与 Doobie (fs2) 流一起使用,但似乎无法使其正常工作。如果没有流式传输,我的测试应用程序可以正常退出,但是在使用流式传输后,我的 TaskApp 似乎挂在关机状态并且无法弄清楚原因。
这是重现问题的最小示例:
package example
import java.util.concurrent.Executors
import doobie.implicits._
import cats.effect.{Blocker, ContextShift, ExitCode, Resource}
import doobie.hikari.HikariTransactor
import monix.eval.{Task, TaskApp}
import com.typesafe.scalalogging.StrictLogging
import fs2.interop.reactivestreams._
import monix.reactive.Observable
import scala.concurrent.ExecutionContext
object Hello extends TaskApp with StrictLogging {
private def resources()(implicit contextShift: ContextShift[Task]): Resource[Task, Resources] = {
for {
transactor <- Database.transactor("org.postgresql.Driver", "jdbc:postgresql://localhost/fubar", "fubar", "fubar")
} yield Resources(transactor)
}
def run(args: List[String]): Task[ExitCode] = resources().use(task)
.flatMap(_ => Task { println("All Done!") })
.flatMap(_ => Task(ExitCode.Success))
def task(resources: Resources): Task[Unit] = {
val publisher =
sql"""select id from message;"""
.query[(Long)]
.stream
.transact(resources.transactor)
.toUnicastPublisher()
Observable.fromReactivePublisher(publisher)
.foreachL(id => logger.info(id.toString))
}
}
case class Resources(transactor: HikariTransactor[Task])
object Database {
val ecBlocking = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))
def transactor(dbDriver: String, dbUrl: String, dbUser: String, dbPassword: String)(implicit contextShift: ContextShift[Task]): Resource[Task, HikariTransactor[Task]] = {
HikariTransactor.newHikariTransactor[Task](dbDriver, dbUrl, dbUser, dbPassword, ecBlocking, Blocker.liftExecutionContext(ecBlocking))
}
}
我已根据 Monix 文档将 fs2 流转换为 Monix observable:https ://monix.io/docs/current/reactive/observable.html#fs2
我是否需要以某种方式关闭 fs2 流或 Observable 才能干净地退出应用程序?感谢任何提示以使其正常工作或提示如何正确调试它。