0

试图了解渠道。我想对 android BluetoothLeScanner 进行通道化。为什么会这样:

fun startScan(filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> {
    val channel = Channel<ScanResult>()
    scanCallback = object : ScanCallback() {
        override fun onScanResult(callbackType: Int, result: ScanResult) {
            channel.offer(result)
        }
    }
    scanner.startScan(filters, settings, scanCallback)

    return channel
}

但不是这个:

fun startScan(scope: CoroutineScope, filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> = scope.produce {
    scanCallback = object : ScanCallback() {
        override fun onScanResult(callbackType: Int, result: ScanResult) {
            offer(result)
        }
    }
    scanner.startScan(filters, settings, scanCallback)
}

它会告诉我Channel was closed何时想offer第一次打电话。

EDIT1:根据文档:The channel is closed when the coroutine completes.这是有道理的。我知道我们可以使用suspendCoroutinewithresume进行一次性替换callback。然而,这是一个监听器/流的情况。我不希望协程完成

4

1 回答 1

1

使用produce,您将作用域引入您的频道。这意味着,可以取消生成通过通道流式传输的项目的代码。

这也意味着您的 Channel 的生命周期从 lambda 的开头开始,produce并在此 lambda 结束时结束。

在您的示例中,您调用的 lambdaproduce几乎立即结束,这意味着您的 Channel 几乎立即关闭。

将您的代码更改为以下内容:

fun CoroutineScope.startScan(filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> = produce {
    scanCallback = object : ScanCallback() {
        override fun onScanResult(callbackType: Int, result: ScanResult) {
            offer(result)
        }
    }
    scanner.startScan(filters, settings, scanCallback)

    // now suspend this lambda forever (until its scope is canceled)
    suspendCancellableCoroutine<Nothing> { cont ->
        cont.invokeOnCancellation {
            scanner.stopScan(...)
        }
    }
}

...
val channel = scope.startScan(filter)
...
...
scope.cancel() // cancels the channel and stops the scanner.

我添加了该行suspendCancellableCoroutine<Nothing> { ... }以使其“永远”暂停。

更新:produce以结构化方式使用和处理错误(允许结构化并发):

fun CoroutineScope.startScan(filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> = produce {
    // Suspend this lambda forever (until its scope is canceled)
    suspendCancellableCoroutine<Nothing> { cont ->
        val scanCallback = object : ScanCallback() {
            override fun onScanResult(callbackType: Int, result: ScanResult) {
                offer(result)
            }
            override fun onScanFailed(errorCode: Int) {
                cont.resumeWithException(MyScanException(errorCode))
            }
        }
        scanner.startScan(filters, settings, scanCallback)

        cont.invokeOnCancellation {
            scanner.stopScan(...)
        }
    }
}
于 2019-05-23T21:10:26.117 回答