3

MySQL在我的应用程序中,我必须与多个数据库一一交互(只读) 。对于每个数据库,我需要一定数量的连接。与数据库的交互不会一次性发生:我查询数据库,花一些时间处理结果,再次查询数据库,再次处理结果等等。

这些交互中的每一个都需要多个连接[我同时触发多个查询],因此我需要一个ConnectionPool在我开始与数据库交互时产生并一直存在直到我完成对该数据库的所有查询(包括当我'不查询,只处理结果)。


我能够成功创建一个ConnectionPool具有所需连接数并获得implicit session如下所示的

def createConnectionPool(poolSize: Int): DBSession = {
 implicit val session: AutoSession.type = AutoSession

 ConnectionPool.singleton(
   url = "myUrl",
   user = "myUser",
   password = "***",
   settings = ConnectionPoolSettings(initialSize = poolSize)
 )

 session
}

implicit session然后,我在需要与 DB 交互的所有方法中传递它。这样,我就可以使用 this同时poolSize触发任何查询。很公平。session

def methodThatCallsAnotherMethod(implicit session: DBSession): Unit = {
  ...
  methodThatInteractsWithDb
  ...
}

def methodThatInteractsWithDb(implicit session: DBSession): Unit = {
  ...
  getResultsParallely(poolSize = 32, fetchSize = 2000000)
  ...
}

def getResultsParallely(poolSize: Int, fetchSize: Int)(implicit session: DBSession): Seq[ResultClass] = {
  import java.util.concurrent.Executors
  import scala.concurrent.ExecutionContext
  import scala.concurrent.duration._

  implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(poolSize))

  val resultsSequenceFuture: Seq[Future[ResultClass]] = {
    (0 until poolSize).map { i =>
      val limit: Long = fetchSize
      val offset: Long = i * fetchSize

      Future(methodThatMakesSingleQuery(limit, offset))
    }
  }
  val resultsFutureSequence: Future[Seq[ResultClass]] = Future.sequence(resultsSequenceFuture)

  Await.result(resultsFuture, 2.minutes)
}

这种技术有两个问题:

  1. 我的应用程序很大并且有很多嵌套的方法调用,所以implicit session像这样通过所有方法(见下文)是不可行的。
  2. 除了上述与不同数据库的交互之外,我还需要在整个应用程序的整个生命周期中与另一个(固定)数据库建立单一连接。每隔几分钟,此连接将用于进行一次小型写入操作记录我与其他 DB 交互的进度)。因此,我需要多个ConnectionPools,每个 DB 一个

从我可以从ScalikeJdbc's docs中获得的信息,我想出了以下不需要我通过implicit session任何地方的方法。

def createConnectionPool(poolName: String, poolSize: Int): Unit = {
  ConnectionPool.add(
    name = poolName,
    url = "myUrl",
    user = "myUser",
    password = "***",
    settings = ConnectionPoolSettings(initialSize = poolSize)
  )
}

def methodThatInteractsWithDb(poolName: String): Unit = {
  ...
  (DB(ConnectionPool.get(poolName).borrow())).readOnly { implicit session: DBSession =>
    // interact with DB
    ...
  }
  ...
}

虽然这可行,但我不再能够并行化 db-interaction。这种行为很明显,因为我使用的是从poolborrow()获取单个连接的方法。反过来,这让我想知道为什么这件事更早起作用:为什么我能够使用一个单一的同时触发多个查询?如果那件事奏效了,那为什么这不起作用呢?但是我没有找到如何从支持多个连接的 a 中获取 a 的示例。AutoSessionimplicit sessionDBSessionConnectionPool


总而言之,我有 2 个问题和 2 个解决方案:每个问题一个。但我需要一个解决这两个问题的单一(通用)解决方案。

ScalikeJdbc的有限文档没有提供很多帮助,并且ScalikeJdbc几乎不存在有关的博客/文章。请提出正确的方法/一些解决方法。


框架版本

  • Scala 2.11.11
  • "org.scalikejdbc" %% "scalikejdbc" % "3.2.0"
4

1 回答 1

2

感谢@Dennis Hunziker,我能够找出正确的方法ScalikeJdbc来释放从's借来的连接ConnectionPool。可以按如下方式完成:

import scalikejdbc.{ConnectionPool, using}
import java.sql.Connection

using(ConnectionPool.get("poolName").borrow()) { (connection: Connection) =>
    // use connection (only once) here
}
// connection automatically returned to pool

有了这个,现在我可以并行化与池的交互。


为了解决管理多个ConnectionPools 和跨多个classes 使用连接的问题,我最终编写了一个ConnectionPoolManager完整的代码,可以在此处找到。通过卸载任务

  • 创建池
  • 从池中借用连接
  • 删除池

对于singleton我可以在项目中的任何地方使用的对象,我能够清除很多混乱并消除了implicit session跨方法链的需要。


编辑-1

虽然我已经链接了完整的代码ConnectionPoolManager,但这里有一个快速提示,告诉你如何去做

以下方法让您从sConnectionPoolManager借用连接ConnectionPool

def getDB(dbName: String, poolNameOpt: Option[String] = None): DB = {
  // create a pool for db (only) if it doesn't exist
  addPool(dbName, poolNameOpt)

  val poolName: String = poolNameOpt.getOrElse(dbName)
  DB(ConnectionPool.get(poolName).borrow())
}

此后,在整个代码中,您可以使用上述方法从池中借用连接并进行查询

def makeQuery(dbName: String, poolNameOpt: Option[String]) = {
  ConnectionPoolManager.getDB(dbName, poolNameOpt).localTx { implicit session: DBSession =>
    // perform ScalikeJdbc SQL query here
  }
}
于 2018-09-06T14:08:56.977 回答