1

我正在尝试创建一个框架,在该框架中我将通过 REST API 接收请求并等待另一个服务(通过 gRPC 工作)来轮询并执行请求。这是必需的,因为“其他”服务非常深入地嵌入到网络中,我不能直接调用它。同时,我想将其他服务的输出缓冲回请求源。

有什么想法可以在 2 个不同的异步 API 请求之间共享这些数据吗?使用文件系统是一种方式......但我在想我可以通过渠道或其他方式做得更好......?

4

2 回答 2

1

下面是一种伪代码:

func RestHandler(payload string) (string, error){
    respChan := make(chan string)
    workId := placeWorkInQueue(payload)
    // Start polling in the background
    go pollForResult(respChan, workId)
    // wait for result in the channel
    var result string
    select {
    case result = <-respChan:
        // implement your timeout logic as a another case: here
    }
    return result, nil
}

// This is poller for just the workId given to it.
func pollForResult(respChan chan string, workId string) {
    // Do the polling for workId result 
    /// Write response to respChan when available 
    // You may have to implement a timeout to give up polling.

}

func placeWorkInQueue(s string) string {
    // Place the job in queue and return a unique workId
    return "unique-id"
}
于 2020-03-28T17:51:22.813 回答
0

双向使用 Redis 队列。API 端点将请求和唯一 ID 写入队列,将具有唯一 ID 的 Go 通道注册为中央读取器的键,并在 Go 通道上等待。

队列阅读器从 Redis 队列中获取带有 id 的响应,并将响应发送到适当的 Go 通道。

// Response represents the response type from the
// remote service.
type Response struct {
    ID string
    // ... more fields as needed
}

// Request represents request to remote serivce.
type Request struct {
    ID string
    // ... more fields as needed
}

// enqueueRequest writes request to Redis queue.
// Implementation TBD by application.
func enqueueRequest(r *Request) error {
    return nil
}

// dequeueResponse reads a response from a Redis queue.
// Implementation TBD by application.
func dequeueResponse() (*Response, error) {
    return nil, nil
}

使用 sync.Map 从 API 请求处理程序注册等待的 Go 通道。键是请求的唯一 id,值是 a chan *Response

  var responseWaiters sync.Map

在单个 goroutine 中运行 queuePump 以将结果从 Redis 队列中取出并发送到适当的通道。在服务 HTTP 请求之前启动 goroutine。

func queuePump() {
    for {
        response, err := dequeueResponse()
        if err != nil {
            // handle error
        }
        v, ok := responseWaiters.Load(response.ID)
        if ok {
            c := v.(chan *Response)
            c <- response

            // Remove cahnel from map to ensure that pump never sends 
            // twice to same channel. The pump will black forever if
            // this happens. 
            responseWaiters.Delete(response.ID)
        }
    }
}

API 端点为请求分配一个唯一的 id,向队列泵注册一个通道,将请求排入队列并等待响应。

func apiEndpoint(w http.ResponseWriter, r *http.Request) {
    id := generateUniqueID()

    c := make(chan *Response, 1) // capacity 1 ensures that queue pump will not block
    responseWaiters.Store(id, c)
    defer responseWaiters.Delete(id)

    req := &Request{
        ID: id,
        // fill in other fields as needed
    }

    if err := enqueueRequest(req); err != nil {
        // handle error
    }

    select {
    case resp := <-c:
        // process response
        fmt.Println(resp)
    case <-time.After(10 * time.Second):
        // handle timeout error
    }
}
于 2020-03-28T18:18:08.243 回答