背景
生产者生成一些数据并按顺序发送到 Kafka,例如:
{uuid:123 状态:1}
{uuid:123 状态:3}
状态 1 表示开始
状态 3 表示成功
我使用sarama.NewConsumerGroup(xx, xx, config).Consume(xx, xx, myhandler)来使用代码:
func (h myhandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
key := fmt.Sprintf("%q-%d-%d", msg.Topic, msg.Partition, msg.Offset)
_, err := rdb.RedisClient.Get(h.ctx, key).Result()
if err == redis.Nil {
msgQueue <- msg.Value
sess.MarkMessage(msg, "")
rdb.RedisClient.Set(h.ctx, key, none, 12*time.Hour)
} else if err != nil {
log.Errorln("RedisClient get key error : ", err)
return err
} else {
continue
}
}
return nil
}
msgQueue := make(chan interface{}, 1000)
然后我将 msgQueue 中的值解码为一个结构并将一条记录插入到 mysql 中。
问题
通常,最终数据状态是 '3',但我发现有时它是 '1'
而且我发现通道msgQueue中的消息顺序不固定。
那么如何确保 data 的最终状态为3?
怎么修
我提供的方法还不够好,无法查看如何对其进行优化。
conn := &gorm.DB{}
data := &Log{}
if data.Status != 1 {
conn = conn.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "uuid"}},
DoUpdates: clause.AssignmentColumns([]string{"status"}),
})
}
conn.Create(data)
return conn.Error
mysql 对uuid有一个唯一的约束索引。
当数据顺序为{uuid: 123 status: 1} , {uuid: 123 status: 3}时,是对的。
当数据顺序为{uuid: 123 status: 3} , {uuid: 123 status: 1}时,最终状态也是正确的,但会返回错误Error 1062: Duplicate entry '123' for key 'unique_index_uuid'。
它不漂亮。那么我该如何优化或有其他方法可以做到这一点?