我正在尝试对 Akka 流中的以下函数进行包装。
RestartFlow.withBackoff(minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2) {
() => s
}
其中 s 是我用后退包装的一些来源。理想情况下 id 像这样
RetryFlow(s)
我设法创建了这个:
object RetryFlow {
def apply[In, Out, _, T <: Flow[In, Out, _]](source: T, minBackoff: FiniteDuration = 3.seconds, maxBackoff: FiniteDuration = 30.seconds, randomFactor: Double = 0.2): Flow[In, Out, NotUsed] = {
RestartFlow.withBackoff(
minBackoff = minBackoff,
maxBackoff = maxBackoff,
randomFactor = randomFactor) {
() => source
}
}
}
问题是我需要在呼叫站点再次提供所有 3 种类型的流参数,这看起来很可怕
RetryFlow[JustDataEvent, JustDataEvent, NotUsed, Flow[JustDataEvent, JustDataEvent, NotUsed]](s)
它也不是类型安全的,因为我可以在这里输入任何类型参数。
我觉得应该是可能的,但我不确定如何不采用额外的类型参数 In 和 Out,而是执行 T#In、T#Out 等操作,因为我说过 T 扩展了流程,因此,T已经有了我需要的类型参数。