1

我们正在使用 quarkus 来处理在常规函数上运行的消息,因为我们suspend基本上必须调用一个函数

fun process (msg:Message):Message{
    val resFrom:Data = runBlocking{fetchDataFromDB(msg.key)}
    val processedMsg = processRoutingKey(msg,resFrom)
    return processedMsg

}

我们希望将数据作为Unihttps://smallrye.io/smallrye-mutiny/getting-started/creating-unis)所以基本上我们想回来

fun process (msg:Message){
    val resFrom:Uni<Data> = ConvertUni {fetchDataFromDB(msg.key)}
}

我们需要进一步下游的 uni 来处理一些数据,但我们想Uni从方法中返回 a

fun process (msg:Message):Uni<Message>{
    val resFrom:Uni<Data> = ConvertUni {fetchDataFromDB(msg.key)}
    val processed:Uni<Message> =process(msg,resfrom) 
    return processed 

}
4

1 回答 1

1

签名fun process(msg:Message): Uni<Message>意味着需要启动一些异步机制,并且会比方法调用更有效。这就像返回 aFuture或 a Deferred。该函数立即返回,但底层处理尚未完成。

在协程世界中,这意味着您需要启动一个协程。但是,与任何异步机制一样,它要求您了解它将在哪里运行以及运行多长时间。这是由CoroutineScope您用来启动协程的定义的,这就是为什么协程构建器async需要这样的范围。

因此,CoroutineScope如果您希望它启动一个持续时间比函数调用更长的协程,则需要将 a 传递给您的函数:

fun CoroutineScope.process(msg:Message): Uni<Message> {
    val uniResult = async { fetchDataFromDB(msg.key) }.asUni()
    return process(msg, uniResult) 
}

这里Deferred<T>.asUni()由图书馆mutiny-kotlin提供。在他们的文档中给出的示例中,他们使用GlobalScope而不是要求调用者传递协程范围。这通常是一种不好的做法,因为这意味着您无法控制已启动协程的生命周期,如果您不小心,可能会泄漏一些东西。

接受一个CoroutineScope作为接收者意味着方法的调用者可以选择这个协程的作用域,合适的时候会自动取消协程,同时也会定义协程运行所在的线程池/事件循环。

现在,考虑到这一点,您会看到您将在Uni这里混合使用协程和相同级别的 API,这不是很好。我建议您一直坚持使用挂起功能,直到您真的必须转换为Uni.

于 2022-02-04T14:15:38.050 回答