0

背景

生产者生成一些数据并按顺序发送到 Kafka,例如:

{uuid:123 状态:1}

{uuid:123 状态:3}

状态 1 表示开始

状态 3 表示成功

我使用sarama.NewConsumerGroup(xx, xx, config).Consume(xx, xx, myhandler)来使用代码:

func (h myhandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {

        key := fmt.Sprintf("%q-%d-%d", msg.Topic, msg.Partition, msg.Offset)
        _, err := rdb.RedisClient.Get(h.ctx, key).Result()
        if err == redis.Nil {
            msgQueue <- msg.Value
            sess.MarkMessage(msg, "")
            rdb.RedisClient.Set(h.ctx, key, none, 12*time.Hour)
        } else if err != nil {
            log.Errorln("RedisClient get key error : ", err)
            return err
        } else {
            continue
        }

    }
    return nil
}

msgQueue := make(chan interface{}, 1000)

然后我将 msgQueue 中的值解码为一个结构并将一条记录插入到 mysql 中。

问题

通常,最终数据状态是 '3',但我发现有时它是 '1'

而且我发现通道msgQueue中的消息顺序不固定。

那么如何确保 data 的最终状态为3

怎么修

我提供的方法还不够好,无法查看如何对其进行优化。

    conn := &gorm.DB{}
    data := &Log{}
    if data.Status != 1 {
        conn = conn.Clauses(clause.OnConflict{
            Columns:   []clause.Column{{Name: "uuid"}},
            DoUpdates: clause.AssignmentColumns([]string{"status"}),
        })
    }
    conn.Create(data)
    return conn.Error

mysql 对uuid有一个唯一的约束索引。

当数据顺序为{uuid: 123 status: 1} , {uuid: 123 status: 3}时,是对的。

当数据顺序为{uuid: 123 status: 3} , {uuid: 123 status: 1}时,最终状态也是正确的,但会返回错误Error 1062: Duplicate entry '123' for key 'unique_index_uuid'

它不漂亮。那么我该如何优化或有其他方法可以做到这一点?

4

1 回答 1

1

这取决于主题分区。Kafka 不提供主题内的排序保证,仅在分区内提供。

换句话说,如果您将消息 A,然后消息 B发送到分区 0,那么顺序将是:首先是 A,然后是 B。但是如果它们最终位于不同的分区上,则可能会发生 B 被写入其分区的情况,在将 A 写入其之前。

这是来自 Confluent 网站的引述:

Kafka 仅提供分区内记录的总顺序,而不是主题中不同分区之间的总顺序。对于大多数应用程序而言,按分区排序与按键分区数据的能力相结合就足够了。但是,如果您需要对记录进行总排序,这可以通过只有一个分区的主题来实现,尽管这意味着每个消费者组只有一个消费者进程。

关联

于 2021-10-25T11:53:57.200 回答