5

我们正在从 AMQP 迁移到 Google 的 Pubsub。

文档建议拉可能是我们的最佳选择,因为我们正在使用计算引擎并且无法打开我们的工作人员以通过推送服务接收。

它还表示,根据使用情况,拉动可能会产生额外的成本:

如果使用轮询,如果您频繁打开连接并立即关闭它们,可能会导致高网络使用率。

我们在 go 中创建了一个测试订阅者,它在循环中运行,如下所示:

func main() {
    jsonKey, err := ioutil.ReadFile("pubsub-key.json")
    if err != nil {
        log.Fatal(err)
    }
    conf, err := google.JWTConfigFromJSON(
        jsonKey,
        pubsub.ScopeCloudPlatform,
        pubsub.ScopePubSub,
    )
    if err != nil {
        log.Fatal(err)
    }
    ctx := cloud.NewContext("xxx", conf.Client(oauth2.NoContext))

    msgIDs, err := pubsub.Publish(ctx, "topic1", &pubsub.Message{
        Data: []byte("hello world"),
    })

    if err != nil {
        log.Println(err)
    }

    log.Printf("Published a message with a message id: %s\n", msgIDs[0])

    for {
        msgs, err := pubsub.Pull(ctx, "subscription1", 1)
        if err != nil {
            log.Println(err)
        }

        if len(msgs) > 0 {
            log.Printf("New message arrived: %v, len: %d\n", msgs[0].ID, len(msgs))
            if err := pubsub.Ack(ctx, "subscription1", msgs[0].AckID); err != nil {
                log.Fatal(err)
            }
            log.Println("Acknowledged message")
            log.Printf("Message: %s", msgs[0].Data)
        }
    }
}

我的问题是这是否是正确/推荐的方式来获取消息。

我们全天每秒收到大约 100 条消息。我不确定在无限循环中运行它是否会让我们破产,并且找不到任何其他像样的 go 示例。

4

1 回答 1

4

一般来说,在 Cloud Pub/Sub 中拉取订阅者的关键是确保您始终有至少几个未完成的拉取请求,并将 max_messages 设置为适用于以下情况的值:

  • 您发布消息的速度,
  • 这些消息的大小,以及
  • 您的订阅者可以处理消息的消息速率。

一旦拉取请求返回,您应该发出另一个请求。这意味着异步处理和确认在拉取响应中返回给您的消息(或异步启动新的拉取请求)。如果您发现吞吐量或延迟不是您所期望的,首先要做的是添加更多并发拉取请求。

如果您的发布率极低,则“如果使用轮询,如果您频繁打开连接并立即关闭它们,可能会导致高网络使用率”的说法适用。想象一下,您一天只发布两三条消息,但您不断地通过拉取请求进行轮询。这些拉取请求中的每一个都会产生发出请求的成本,但是除了您实际有消息的少数几次之外,您不会得到任何要处理的消息,因此“每条消息的成本”相当高。如果您以相当稳定的速度发布并且您的拉取请求返回的消息数量非零,那么网络使用率和成本将与消息率一致。

于 2016-04-11T17:52:17.973 回答