当我在收到消息数据时尝试从 akka 演员写入 postgres 表时遇到问题。我不确切知道真正的问题,我认为这是异步上下文,因为当我调用一个带有一些列表数据作为参数的函数时,该函数应该将列表数据写入 postgres,而不是什么都不执行,它不会仅标记任何错误,它不执行写入功能。数据来自休息点,我使用 akka http 发出请求。
处理写入 postgres 的代码演员。编码:
object ClickUpTeamsActions {
case class Fetch (writerRef: ActorRef)
case class Write (teamsData: List[TeamClickUp] )
}
class ClickUpTeamsExtractor extends Actor with ActorLogging {
import ClickUpTeamsActions._
override def receive: Receive = {
case Write(teamsData) =>
println("-------writer")
println(teamsData)
val teamsPG: List[Teams] = teamsData.map(data => Teams(data.id, data.name, data.color, data.avatar))
println(teamsPG)
val bulkInsert = DoobieTest.insertMany(teamsPG) // function writes into postgres
val io = bulkInsert.transact(DoobieTest.xa)
io.unsafeRunSync() // this does not work
}
}
code 是一个对象,它有一个写入 postgres 的函数,这个函数是从 actor 接收调用的:
import cats.effect.IO
import cats.implicits.catsStdInstancesForList
import doobie.{ConnectionIO, ExecutionContexts, Transactor, Update}
object DoobieTest {
import doobie.util.ExecutionContexts
implicit val cs = IO.contextShift(ExecutionContexts.synchronous)
val xa = Transactor.fromDriverManager[IO](
"org.postgresql.Driver", // driver classname
"jdbc:postgresql:zzzzzzz", // connect URL (driver-specific)
"yyyyyyyyyyy", // user
"XXXXXXXXXXXX", // password
)
def insertMany(ps: List[Teams]): ConnectionIO[Int] = {
val sql = "insert into Teams (id, name, color, avatar) values ( ? , ? , ? , ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name"
Update[Teams](sql).updateMany(ps)
}
}
错误在于:
// this code does not work when run on receive..
val bulkInsert = DoobieTest.insertMany(teamsPG) // function writes into postgres
val io = bulkInsert.transact(DoobieTest.xa)
io.unsafeRunSync() // this does not work
如果有另一种方法可以做到这一点,它可能会有所帮助。