这里是反应式编程的新手。
在Multi转换中,我需要为几次迭代累积字节,检查传入的字节,当我遇到特定的字节模式时,我会发布整个累积的 byteArray 以供进一步使用。
似乎是一个简单的要求,但我被卡住了。
伪 Kotlin 代码:
val array = listOf<ByteArray>(....)
Multi.createFrom().items(array)
.onItem.transform { oneByteArray: ByteArray ->
if (oneByteArray.indexOf(0x0d) == -1) {
// somehow accumulate oneByteArray somewhere (1)
// do not publish anything downstream (2)
} else {
// publish accumulated byteArrays as one large byteArray
}
}
我坚持(1)和(2)。暂时停止下游发布(当我正在寻找特定字节时)的机制是什么?