信号可以通过几种不同的方式在 Cap'n Proto 之上实现。
对象链
Cap'n Proto RPC 调用需要很长时间才能完成没有问题。同一连接上的其他呼叫可以正常继续,并且您一次可以有许多未完成的呼叫。因此,接收信号的一种策略是在返回之前有一个等待信号的调用。
许多 RPC 系统支持挂起调用,但还有一个额外的挑战:如果您有一个信号流,并且客户端观察流中的每个信号很重要,那么如果生成新信号的速度比客户端调用RPC 来读取它们。您需要为每个客户保留一个缓冲区。但是如果客户端死了并且停止发出请求怎么办?现在您需要某种超时,然后将其清除。
与大多数其他 RPC 系统不同,Cap'n Proto 支持动态生成新对象。因此,您可以将信号流表示为对象链。例如:
struct MyPayload { ... }
interface MyInterface {
subscribe @0 () -> (firstSignal :Signal(MyPayload));
# Subscribe to signals from this interface.
}
interface Signal(Type) {
# One signal in a stream of signals. Has a payload, and lets you
# wait for the next signal.
get @0 () -> (value :Type);
# Gets the payload value of this signal. (Returns immediately.)
waitForNext @1 () -> (nextSignal :Signal(Type));
# Waits for the next signal in the sequence, returning a new
# `Signal` object representing it.
}
这极大地简化了服务器端的状态管理,因为 Cap'n Proto 会在所有客户端都表明它们已完成处理后立即自动调用每个对象的析构函数(通过销毁客户端引用,也就是“删除”它)。如果客户端断开连接,则其所有引用都将被隐式删除。
回调
因为 Cap'n Proto 允许双向 RPC 调用(客户端 -> 服务器和服务器 -> 客户端),您可以使用回调实现“信号”或发布/订阅机制:
struct MyPayload { ... }
interface MyInterface {
subscribe @0 (cb :Callback(MyPayload)) -> (handle :Handle);
}
interface Callback(Type) {
call @0 (value :Type);
}
interface Handle {}
客户端调用subscribe()
并传递一个回调对象cb
。然后,只要有信号,服务器就可以回调客户端。
请注意,它subscribe()
返回 a Handle
,它是一个没有方法的对象。这样做的目的是检测客户端何时取消订阅。如果客户端 drop handle
,将通知服务器(服务器端对象的析构函数将运行),然后服务器可以取消注册回调。这也处理了客户端断开连接的情况——所有对象引用都在断开连接时隐式删除。
乍一看,由于其简单性,该解决方案可能看起来比对象链解决方案好得多。但是,它的问题是您现在有指向两个方向的对象引用,这可能导致循环。在您的客户端代码中,您必须小心确保回调实现不“拥有”保持它注册的句柄,否则它将永远不会被清理(除非连接关闭)。您还必须确保在删除句柄后仍然可以在短时间内调用回调,同时等待服务器取消注册回调。这些问题在对象链解决方案中不存在,这可能会使该解决方案更易于实施。
其他 RPC 系统
我在上面讨论了 Cap'n Proto,因为我是作者,而且它提供了比大多数 RPC 系统更多的选项。
如果你使用 gRPC,你可以使用它的“流”特性来支持信号之类的东西。随着时间的推移,流式 RPC 可以返回多个响应。
我不确定Thrift。我上次尝试时,请求必须是 FIFO,这意味着长时间运行的 RPC 是禁忌。然而,那是很久以前的事了,也许从那时起它已经改变了。