我正在尝试创建一个框架,在该框架中我将通过 REST API 接收请求并等待另一个服务(通过 gRPC 工作)来轮询并执行请求。这是必需的,因为“其他”服务非常深入地嵌入到网络中,我不能直接调用它。同时,我想将其他服务的输出缓冲回请求源。
有什么想法可以在 2 个不同的异步 API 请求之间共享这些数据吗?使用文件系统是一种方式......但我在想我可以通过渠道或其他方式做得更好......?
下面是一种伪代码:
func RestHandler(payload string) (string, error){
respChan := make(chan string)
workId := placeWorkInQueue(payload)
// Start polling in the background
go pollForResult(respChan, workId)
// wait for result in the channel
var result string
select {
case result = <-respChan:
// implement your timeout logic as a another case: here
}
return result, nil
}
// This is poller for just the workId given to it.
func pollForResult(respChan chan string, workId string) {
// Do the polling for workId result
/// Write response to respChan when available
// You may have to implement a timeout to give up polling.
}
func placeWorkInQueue(s string) string {
// Place the job in queue and return a unique workId
return "unique-id"
}
双向使用 Redis 队列。API 端点将请求和唯一 ID 写入队列,将具有唯一 ID 的 Go 通道注册为中央读取器的键,并在 Go 通道上等待。
队列阅读器从 Redis 队列中获取带有 id 的响应,并将响应发送到适当的 Go 通道。
// Response represents the response type from the
// remote service.
type Response struct {
ID string
// ... more fields as needed
}
// Request represents request to remote serivce.
type Request struct {
ID string
// ... more fields as needed
}
// enqueueRequest writes request to Redis queue.
// Implementation TBD by application.
func enqueueRequest(r *Request) error {
return nil
}
// dequeueResponse reads a response from a Redis queue.
// Implementation TBD by application.
func dequeueResponse() (*Response, error) {
return nil, nil
}
使用 sync.Map 从 API 请求处理程序注册等待的 Go 通道。键是请求的唯一 id,值是 a chan *Response
。
var responseWaiters sync.Map
在单个 goroutine 中运行 queuePump 以将结果从 Redis 队列中取出并发送到适当的通道。在服务 HTTP 请求之前启动 goroutine。
func queuePump() {
for {
response, err := dequeueResponse()
if err != nil {
// handle error
}
v, ok := responseWaiters.Load(response.ID)
if ok {
c := v.(chan *Response)
c <- response
// Remove cahnel from map to ensure that pump never sends
// twice to same channel. The pump will black forever if
// this happens.
responseWaiters.Delete(response.ID)
}
}
}
API 端点为请求分配一个唯一的 id,向队列泵注册一个通道,将请求排入队列并等待响应。
func apiEndpoint(w http.ResponseWriter, r *http.Request) {
id := generateUniqueID()
c := make(chan *Response, 1) // capacity 1 ensures that queue pump will not block
responseWaiters.Store(id, c)
defer responseWaiters.Delete(id)
req := &Request{
ID: id,
// fill in other fields as needed
}
if err := enqueueRequest(req); err != nil {
// handle error
}
select {
case resp := <-c:
// process response
fmt.Println(resp)
case <-time.After(10 * time.Second):
// handle timeout error
}
}