0

当我在收到消息数据时尝试从 akka 演员写入 postgres 表时遇到问题。我不确切知道真正的问题,我认为这是异步上下文,因为当我调用一个带有一些列表数据作为参数的函数时,该函数应该将列表数据写入 postgres,而不是什么都不执行,它不会仅标记任何错误,它不执行写入功能。数据来自休息点,我使用 akka http 发出请求。

处理写入 postgres 的代码演员。编码:

  object ClickUpTeamsActions {
    case class Fetch (writerRef: ActorRef)
    case class Write (teamsData: List[TeamClickUp] )
  }

  class ClickUpTeamsExtractor extends Actor with ActorLogging {
    import ClickUpTeamsActions._

    override def receive: Receive = {
      case Write(teamsData) =>
        println("-------writer")
        println(teamsData)
        val teamsPG: List[Teams] = teamsData.map(data => Teams(data.id, data.name, data.color, data.avatar))
        println(teamsPG)

          val bulkInsert = DoobieTest.insertMany(teamsPG) // function writes into postgres
          val io = bulkInsert.transact(DoobieTest.xa)
          io.unsafeRunSync() // this does not work
        
    }
  }

code 是一个对象,它有一个写入 postgres 的函数,这个函数是从 actor 接收调用的:

import cats.effect.IO
import cats.implicits.catsStdInstancesForList
import doobie.{ConnectionIO, ExecutionContexts, Transactor, Update}

object DoobieTest  {

  import doobie.util.ExecutionContexts
  implicit val cs = IO.contextShift(ExecutionContexts.synchronous)

  val xa = Transactor.fromDriverManager[IO](
    "org.postgresql.Driver", // driver classname
    "jdbc:postgresql:zzzzzzz", // connect URL (driver-specific)
    "yyyyyyyyyyy", // user
    "XXXXXXXXXXXX", // password
  )

  def insertMany(ps: List[Teams]): ConnectionIO[Int] = {
    val sql = "insert into Teams (id, name, color, avatar) values ( ? , ? , ? , ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name"
    Update[Teams](sql).updateMany(ps)
  }
}

错误在于:

// this code does not work when run on receive..
    val bulkInsert = DoobieTest.insertMany(teamsPG) // function writes into postgres
    val io = bulkInsert.transact(DoobieTest.xa)
    io.unsafeRunSync() // this does not work

    

如果有另一种方法可以做到这一点,它可能会有所帮助。

4

0 回答 0