0

我从 GRPC 开始使用 go。我阅读了官方文档和几个例子。

在大多数示例中,您不识别客户端,而是使用流来读取/写入数据。我看到 Context 中有用于检索身份验证信息的 API,并且可以识别 ChatRequest 的客户端。但是,如果我想根据客户端 ID 保留对 Stream 的引用/索引怎么办。

例如,

假设我在聊天室中有 3 个用户。我将 rpc 表示为(也可以是服务器流式传输)

rpc Chat(stream ChatRequest) returns (stream ChatResponse) {}

比如说,一个用户向组发送消息,该组需要发送给其他 2。因此,如果我需要通过当前为这些用户打开的 Stream 发送消息,那么保留该流的引用是多么安全。

实施将像...

type chatServiceServer struct {
    // keep a map of subscribers / users currently connected; protect with mutex
}

func (s *chatServiceServer) Chat(stream pb.ChatService_ChatServer) error {
    // md, ok := metadata.FromIncomingContext(stream.Context())
    // p, ok := peer.FromContext(ctx)
    // ... identify client from above

    for {
        // save the message to DB
        // find other users in the chatroom is currently connected
        // if so, stream.Send(m)
        // else notify ....
    }
}

但是,我在 API 文档中看到了警告,想知道一个更好的方法。

https://godoc.org/google.golang.org/grpc#ServerStream
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines.
SendMsg(m interface{}) error

任何订阅(事件,......)都会出现类似的用例,并且需要根据客户端 ID 进行通知。任何示例代码,文章也会很棒。

谢谢

4

1 回答 1

1

保存流以供其他地方使用应该是安全的,但您只能调用SendMsg单个 goroutine(同样的限制也适用于RecvMsg,独立地)。所以这意味着如果你在你的方法处理程序中这样做:

for {
  if err := stream.Recv(req); err != nil {
    return err
  }
  for _, s := range allStreams[req.ID] {
    if err := s.Send(req.Message); err != nil { /* remove s from allStreams */ }
  }
}

然后调用s.Send 必须由锁保护,因为这些处理程序中的多个可能同时运行。(也allStreams假定为 a map[ID][]stream,在这种情况下,它也必须由锁保护。)

于 2020-01-21T23:49:33.287 回答