6

使用 Slick,您可以执行以下操作以从表中生成结果流:

val q = for (e <- events) yield e.name
val p: DatabasePublisher[String] = db.stream(q.result)

p.foreach { s => println(s"Event: $s") }

这将打印events表中的所有事件并在最后一行之后终止。

假设可以以某种方式通知您何时将新行输入到events表中,是否可以编写一个在插入事件时连续输出事件的流?一种tail -f用于数据库表。

我认为 Slick 本身不会支持这一点,但我认为应该可以使用 Akka 流来提供帮助。因此,如果您可以从 Slick Source 获取某些内容直到它为空,然后等待事件指示表中的更多数据,然后流式传输新数据。可能通过使用ActorPublisher来绑定这个逻辑?

只是想知道是否有人在这方面有任何经验或任何建议?

4

2 回答 2

5

你是对ActorPublisher的:) 这是一个使用 PostgreSQL、异步 DB 驱动程序LISTEN/NOTIFY 机制的简单示例:

演员:

class PostgresListener extends ActorPublisher[String] {

  override def receive = {
    case _ ⇒
      val configuration = URLParser.parse(s"jdbc://postgresql://$host:$port/$db?user=$user&password=$password")
      val connection = new PostgreSQLConnection(configuration)
      Await.result(connection.connect, 5.seconds)

      connection.sendQuery(s"LISTEN $channel")
      connection.registerNotifyListener { message ⇒ onNext(message.payload) }
  }
}

服务:

def stream: Source[ServerSentEvent, Unit] = {
  val dataPublisherRef = Props[PostgresListener]
  val dataPublisher = ActorPublisher[String](dataPublisherRef)

  dataPublisherRef ! "go"

  Source(dataPublisher)
    .map(ServerSentEvent(_))
    .via(WithHeartbeats(10.second))
}

build.sbtlibraryDependencies

"com.github.mauricio"  %% "postgresql-async"         % "0.2.18"

Postgres 触发器应该调用select pg_notify('foo', 'payload')

据我所知,Slick 不支持LISTEN.

于 2015-12-15T22:15:37.860 回答
1

ActorPublisher自 Akka 2.5.0 以来已被弃用。这是一个使用postgresql-asyncSourceQueue库并在 actor 内部创建的替代方案:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._

import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
import com.github.mauricio.async.db.postgresql.util.URLParser

import scala.concurrent.duration._
import scala.concurrent.Await

class DbActor(implicit materializer: ActorMaterializer) extends Actor with ActorLogging {
  private implicit val ec = context.system.dispatcher

  val queue =  
    Source.queue[String](Int.MaxValue, OverflowStrategy.backpressure)
      .to(Sink.foreach(println))
      .run()

  val configuration = URLParser.parse("jdbc:postgresql://localhost:5233/my_db?user=dbuser&password=pwd")
  val connection = new PostgreSQLConnection(configuration)
  Await.result(connection.connect, 5 seconds)

  connection.sendQuery("LISTEN my_channel")
  connection.registerNotifyListener { message =>
    val msg = message.payload
    log.debug("Sending the payload: {}", msg)
    self ! msg
  }

  def receive = {
    case payload: String =>
      queue.offer(payload).pipeTo(self)
    case QueueOfferResult.Dropped =>
      log.warning("Dropped a message.")
    case QueueOfferResult.Enqueued =>
      log.debug("Enqueued a message.")
    case QueueOfferResult.Failure(t) =>
      log.error("Stream failed: {}", t.getMessage)
    case QueueOfferResult.QueueClosed =>
      log.debug("Stream closed.")
  }
}

上面的代码只是在 PostgreSQL 发生通知时打印它们;您可以将 替换为Sink.foreach(println)另一个Sink. 要运行它:

import akka.actor._
import akka.stream.ActorMaterializer

object Example extends App {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  system.actorOf(Props(classOf[DbActor], materializer))
}
于 2018-07-25T09:52:48.973 回答