简洁版本:
我想实现一个函数,该函数返回一个等待“发出”值块的转换器。
我想到的功能将具有以下签名:
/**
* The `Process1` which awaits the next "effect" to occur and passes all values emitted by
* this effect to `rcv` to determine the next state.
*/
def receiveBlock[I, O](rcv: Vector[I] => Process1[I,O]): Process1[I,O] = ???
细节:
我的理解是,我可以使用这个函数来实现以下我认为非常有用的函数:
/**
* Groups inputs into chunks of dynamic size based on the various effects
* that back emitted values.
*
* @example {{{
* val numberTask = Task.delay(1)
* val listOfNumbersTask = Task.delay(List(5,6,7))
* val sample = Process.eval(numberTask) ++ Process(2,3,4) ++ Process.await(listOfNumbersTask)(xs => Process.emitAll(xs))
* sample.chunkByEffect.runLog.run should be List(Vector(1), Vector(2,3,4), Vector(5,6,7))
* }}}
*/
def chunkByEffect[I]: Process1[I, Vector[I]] = {
receiveBlock(vec => emit(vec) ++ chunkByEffect)
}
[更新] 更多详情
我的最终目标(稍微简化)是实现以下功能:
/**
* Transforms a stream of audio into a stream of text.
*/
voiceRecognition(audio: Process[Task, Byte]): Process[Task, String]
该函数对语音识别服务进行外部调用。因此,Byte
对流中的每一个都进行网络调用是不合理的。在进行网络调用之前,我需要将字节组合在一起。我可以做audio
一个Process[Task, ByteVector]
,但这需要测试代码来知道函数支持的最大块大小,我宁愿由函数本身管理。此外,当在服务内部使用此服务时,该服务本身将接收具有给定音频大小的网络调用,我希望该chunkXXX
功能能够智能地进行分块,以便它不会保留已经存在的数据可用的。
基本上,来自网络的音频流将具有格式Process[Task, ByteVector]
并将被翻译成Process[Task, Byte]
by flatMap(Process.emitAll(_))
。但是,测试代码将直接生成 aProcess[Task, Byte]
并将其输入voiceRecognition
. 从理论上讲,我相信应该有可能给定适当的组合器来提供voiceRecognition
对这两个流都做正确事情的实现,我认为chunkByEffect
上面描述的功能是关键。我现在意识到我需要 chunkByEffect 函数min
和max
参数来指定分块的最小和最大大小,而与Task
生成字节的底层无关。