我有以下用例。
异步执行 DB 操作,完成后将 kafka 事件发送到另一个微服务,以便它从 DB 中读取。但是,到目前为止,甚至在数据库操作完成之前就已经发送了 kafka 事件。我的代码如下所示:
firstTask = dbOperation1(k)
secondTask = dbOperation2(t.getId, t, existing)
thirdTask = Task(doSomeDBUpdate).executeOn(io).asyncBoundary
Task.sequence(Seq(firstTask, secondTask, thirdTask, pushToKafkaTask))
有什么办法可以确保 pushToKafkaTask 在前三个任务之后肯定发生?
添加更多代码片段以显示 firstTask 、 secondTask 和 pushToKafkaTask 的样子
val firstTask = dbOperation1(k)
def dbOperation1(k: objPUT)(jt: JdbcTemplate, io: Scheduler): Task[Int] = {
val params = Array(user.userId, DateUtils.currentTimestamp, k.getId)
Task(jt.update(tDao.tUpdate, params: _*)).executeOn(io).asyncBoundary
}
val secondTask = dbOperation2(t.getId, t, existing)
def dbOperation2(id: String,input: objPUTGen, existing: objPUTGen = null,iStd: Boolean = true,
isNSRefUpdate: Boolean = false,)(implicit user: UserDetails, jt: JdbcTemplate): Task[_] =
Task.sequence(Seq(dbOperation3(id, input),
if (iStd) dbOperation4( id, input) else Task.unit, dbOperation5(id, input, existing, isNSRefUpdate) ))
def dbOperation3(id: String, input: TemplateGeneric)(implicit user: UserDetails, jt: JdbcTemplate, io: Scheduler): Task[_] = {
val sDel =
s"""
| delete from "tableName"
| where ID = ?
""".stripMargin
Task(jt.update(sDel, id)).executeOn(io).asyncBoundary
}
def pushToKafkaTask(id: String, cl: String)
(user: UserDetails, kafkaBase: KafkaBase = OKafkaBase): Task[Unit] = {
val msg = MyCaseClass(id, cl)
kafkaBase.pushToKafkaInternalV2(NonEmptyList.of(msg), id, topic)
}