1

我有以下用例。

异步执行 DB 操作,完成后将 kafka 事件发送到另一个微服务,以便它从 DB 中读取。但是,到目前为止,甚至在数据库操作完成之前就已经发送了 kafka 事件。我的代码如下所示:

firstTask = dbOperation1(k)
secondTask = dbOperation2(t.getId, t, existing)
thirdTask = Task(doSomeDBUpdate).executeOn(io).asyncBoundary

Task.sequence(Seq(firstTask, secondTask, thirdTask, pushToKafkaTask))

有什么办法可以确保 pushToKafkaTask 在前三个任务之后肯定发生?

添加更多代码片段以显示 firstTask 、 secondTask 和 pushToKafkaTask 的样子

val firstTask = dbOperation1(k)

def dbOperation1(k: objPUT)(jt: JdbcTemplate, io: Scheduler): Task[Int] = {
    val params = Array(user.userId, DateUtils.currentTimestamp, k.getId)
    
    Task(jt.update(tDao.tUpdate, params: _*)).executeOn(io).asyncBoundary
  }
  
  
val secondTask = dbOperation2(t.getId, t, existing)


def dbOperation2(id: String,input: objPUTGen, existing: objPUTGen = null,iStd: Boolean = true, 
                 isNSRefUpdate: Boolean = false,)(implicit user: UserDetails, jt: JdbcTemplate): Task[_] =
                            
    Task.sequence(Seq(dbOperation3(id, input),
                      if (iStd)  dbOperation4( id, input) else Task.unit, dbOperation5(id, input, existing, isNSRefUpdate) ))
                      

def dbOperation3(id: String, input: TemplateGeneric)(implicit user: UserDetails, jt: JdbcTemplate, io: Scheduler): Task[_] = {

    val sDel =
      s"""
         | delete from "tableName"
         | where ID = ?
       """.stripMargin
       
    Task(jt.update(sDel, id)).executeOn(io).asyncBoundary
  }

 def pushToKafkaTask(id: String, cl: String)
                 (user: UserDetails,  kafkaBase: KafkaBase = OKafkaBase): Task[Unit] = {
   
    val msg = MyCaseClass(id, cl)
    kafkaBase.pushToKafkaInternalV2(NonEmptyList.of(msg), id, topic)
    
  }

4

0 回答 0