1

打电话后Channel.close()等待Channel.isClosedForReceive成为真的最好的方法是什么?

我正在按顺序处理消息,并希望返回调用后处理的最大消息Channel.close()。但是,如果我只是在调用后获取最大处理消息,close()则在消耗“关闭令牌”之前通道中可能有一些消息,导致实际最大处理消息大于返回的值。

基于文档,Channel.close()我认为 Channel.isClosedForReceive 是我应该等待的。但我期待一些挂起功能等待,而不是不得不轮询它的状态。

/**
 * Immediately after invocation of this function
 * [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
 * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
 * are received.
 **/
4

1 回答 1

3

假设您在某种演员中处理所有消息:

val channel = actor<Message> { ... }

最好的方法是了解此参与者处理的最后一条消息是创建一个明确的反向通道来传达此信息。由于我们只需要在使用CompletableDeferred此目的时传达一个值:

val lastProcessed = CompletableDeferred<Message?>() 

现在,您可以通过以下方式编写消息处理参与者:

val actor = actor<Message> {
    var last: Message? = null
    try {
        for (msg in channel) {
            // process message
            last = msg
        }
    } finally {
        // report the last processed message via back channel
        lastProcessed.complete(last)
    }
}

这是关闭通道并学习最后处理的消息的代码:

actor.close()
val last = lastProcessed.await() // receive last processed message
于 2018-02-27T08:37:41.937 回答