这可能不是发布此内容的最佳位置,但我认为无论如何都值得发布。
每次针对 db 连接定义/创建模型时,我都会调用model.syncIndexes()
,这可以确保索引是最新的并且与模式保持同步,但是由于它已在线突出显示(示例),这可能会在分布式架构中产生问题,多个服务器同时尝试相同的操作。如果使用类似库之类的东西在同一台机器上的多个内核上生成主/从实例,这一点尤其重要cluster
,因为当整个服务器启动时,它们通常会在彼此靠近的地方启动。
参考上面的“codebarbarian”文章,当他们说:
Mongoose 不会为您调用 syncIndexes(),您需要自己负责调用 syncIndexes()。这有几个原因,最值得注意的是 syncIndexes() 不执行任何类型的分布式锁定。如果您有多个在启动时调用 syncIndexes() 的服务器,您可能会因为尝试删除不再存在的索引而出错。
所以我要做的是创建一个函数,它使用 redis 和 redis redlock 来获得一段名义时间的租约,以防止多个工作人员(实际上是多个服务器中的多个工作人员)同时尝试相同的同步操作。
它也绕过了整个事情,除非它是试图执行操作的“主人”,我认为将这项工作委托给任何工人都没有任何实际意义。
const cluster = require('cluster');
const {logger} = require("$/src/logger");
const {
redlock,
LockError
} = require("$/src/services/redis");
const mongoose = require('mongoose');
// Check is mongoose model,
// ref: https://stackoverflow.com/a/56815793/1834057
const isMongoModel = (obj) => {
return obj.hasOwnProperty('schema') && obj.schema instanceof mongoose.Schema;
}
const syncIndexesWithRedlock = (model,duration=60000) => new Promise(resolve => {
// Ensure the cluster is master
if(!cluster.isMaster)
return resolve(false)
// Now attempt to gain redlock and sync indexes
try {
// Typecheck
if(!model || !isMongoModel(model))
throw new Error('model argument is required and must be a mongoose model');
if(isNaN(duration) || duration <= 0)
throw new Error('duration argument is required, and must be positive numeric')
// Extract name
let name = model.collection.collectionName;
// Define the redlock resource
let resource = `syncIndexes/${name}`;
// Coerce Duration to Integer
// Not sure if this is strictly required, but wtf.
// Will ensure the duration is at least 1ms, given that duration <= 0 throws error above
let redlockLeaseDuration = Math.ceil(duration);
// Attempt to gain lock and sync indexes
redlock.lock(resource,redlockLeaseDuration)
.then(() => {
// Sync Indexes
model.syncIndexes();
// Success
resolve(true);
})
.catch(err => {
// Report Lock Error
if(err instanceof LockError){
logger.error(`Redlock LockError -- ${err.message}`);
// Report Other Errors
}else{
logger.error(err.message);
}
// Fail, Either LockError error or some other error
return resolve(false);
})
// General Fail for whatever reason
}catch(err){
logger.error(err.message);
return resolve(false);
}
});
我不会设置 Redis 连接,这是其他线程的主题,但上面这段代码的重点是展示如何syncIndexes()
可靠地使用并防止一个线程删除索引而另一个线程尝试删除相同索引的问题,或尝试同时修改索引的其他分布式问题。