我在java中使用grpc-streaming。我有一个持久的开放流,客户端和服务器同时通信。当我调用onNext
发送消息时,grpc 会在内部缓冲消息,并将其通过网络异步发送。现在,如果流在发送数据的过程中丢失,onError
则调用。我想知道正确的做法是什么:
- 找出哪些消息已成功发送
- 如何重试未发送的消息
目前,我正在考虑在应用层实现一个“ack”机制,对于每收到 x 个项目,接收者会发回一个 ack 消息。然后为了实现重试,我需要在发送方缓冲项目,并且只有在收到 ack 时才将它们从缓冲区中删除。此外,在接收方,我需要实现一种机制来忽略收到的重复项目。
示例: 假设我们每发送 100 个项目就发送一个 ack。我们在第 3 批 (200-300) 上收到 ack,然后在发送项目 300-400 时收到错误。我们再次尝试发送项目 300-400 但客户端已成功收到 300-330 并且它将再次接收它们。因此,客户端需要忽略前 30 项。
可以在应用层实现这一点。但是,我想知道是否有更好的实践/框架可以解决这个问题。