1

Ids我有一个类型的数组,int64这是我正在尝试发布的 Nsq 消息。

nsqMsg := st{
    Action  :     "insert",
    Ids     :     Ids
    GID     :     Gids
}

msg, err := json.Marshal(nsqMsg)
if err != nil {
    log.Println(err)
    return err
}
err = nsqProducer.Publish(TOPIC-NAME, msg)
if err != nil {
    log.Println(err)
    return err
}

在我的消费者中,我一个接一个地获取每个 ID,并根据我的 ID 从我的数据存储中获取信息。

因此,在获取时,如果我的 CreateObject 方法返回错误,则可能会出现这种情况,因此我通过重新排队 msg(给出错误)来处理这种情况,因此可以重试。

for i := 0; i < len(data.Ids); i++ {

    Object, err := X.CreateObject(data.Ids[i)
        if err != nil {
            requeueMsgData = append(requeueMsgData, data.Ids[i])
            continue
        }
        DataList = append(DataList, Object)
    }

    if len(requeueMsgData) > 0 {
        msg, err := json.Marshal(requeueMsgData)
        if err != nil {
            log.Println(err)
            return err
        }
        message.Body = msg
        message.Requeue(60 * time.Second)
        log.Println("error while creating Object", err)
        return n
}

那么,这是正确的做法吗?他们在这种情况下有什么缺点吗?再发布一次更好吗?

4

2 回答 2

1

一些队列(如 Kafka)支持确认,其中出队的项目不会从队列中删除,直到消费者实际确认成功接收项目。

这种模式的优点是,如果消费者在消费后但在确认之前死亡,该物品将自动重新排队。您的模型的缺点是在这种情况下该项目可能会丢失。

确认模型的风险是物品现在可能被双重消费。消费者尝试消费具有副作用(如增加计数器或改变数据库)但不确认,因此重试可能不会产生所需的结果。(请注意,通读 nsq 文档,即使您不将数据重新排入队列,也不能保证会发生重试,因此您的代码可能无论如何都必须对此进行防御)。

如果您想更深入地了解这一点,您应该研究“Exactly Once”与“At Most Once”处理的主题。

通读 nsq 文档,似乎不支持确认,因此如果您有义务使用 nsq,这可能是您拥有的最佳选择。

于 2018-09-08T19:10:46.757 回答
1

根据 dolan 所说的,您可能会遇到以下几种情况:

  • 主消息心跳/租约超时,您再次收到所有 ID(来自原始消息)。NSQ 提供“至少一次”语义。
  • 任何单个消息的重新排队都会超时并且永远不会完成(回退到主 IDS)

因为 nsq 可以(并且大多数 def 将:p)多次传递消息CreateObjects可以/应该是幂等的,以便处理这种情况。

此外,重新传递是一个重要的安全机制,在所有单独的 id 或确认创建或成功重新排队之前,不应该找到原始消息,这确保没有数据丢失。


IMO 您处理它的方式看起来非常好,但 IMO 最重要的考虑因素是在收到重复消息的环境中处理正确性/数据完整性。


另一种选择可能是对 Requeue 进行批处理,以便它尝试生成一条失败 id 的输出消息,这可以在任何给定时间减少队列中的消息数:

考虑一条具有 3 个 id 的消息:

消息 ID:[id1,id2,id3]

id1 创建成功,id2 和 id3 失败:

该程序可以尝试所有操作并发出一个重新排队消息,id2,id3。

但也需要权衡取舍。

于 2018-09-08T19:27:49.853 回答