多年来,我一直在使用firebase-queue
调用作业多年,但最近它开始因事务错误竞争资源而失败,并且存在严重的扩展问题。
所以我迁移firebase-queue
到云功能,最初它似乎很快并且非常可扩展。我编写了一个由 onWrite on 连接的简单脚本queue/task/{taskId}
。但似乎有时 onWrite 永远不会随机触发。我根本看不到日志中触发的特定作业。
我们正在使用 blaze 计划,函数返回 promise,timelimit 设置为最大值,99.9% 的作业耗时小于 5000ms。
我们每天有大约 5 万次调用,大约 100 个工作没有被触发。这是断断续续的,对此一无所知。一件奇怪的事情是我在日志查看器上看不到任何错误,但在调用图上,我可以看到一些加载错误和连接错误,但仍然不知道它们是什么。任何解决该问题的见解将不胜感激。
const functions = require(`firebase-functions`);
const admin = require(`firebase-admin`);
const _ = require(`lodash`);
const specs = require(`./specs.json`);
const queueSpecs = require(`./queue-specs.js`);
const SERVER_TIMESTAMP = { ".sv": `timestamp` };
admin.initializeApp();
const workers = {};
queueSpecs.forEach((spec) => {
let specString = spec;
if (typeof spec === `object`) {
specString = spec.id;
}
workers[specString] = require(`./lib/workers/${specString}`);
});
const taskHandler = (change, context) => {
const data = change.after.val();
const ref = change.after.ref;
const { taskID } = context.params;
if (!data) {
return null;
}
const worker = workers[data._state];
if (!worker) {
console.error(`Worker not found for task ${taskID} - ${data._state}`);
return null;
}
const specID = data._state.replace(`/`, `_`);
const spec = specs[specID];
if (!spec) {
console.error(`Spec not found for task ${taskID} - ${specID}`);
return null;
}
console.log(
`Running ${taskID} - ${data._state} - ${JSON.stringify(data, null, 2)}`
);
return new Promise((resolve, reject) => {
const taskResolve = newTask => {
const updates = _.clone(newTask);
let newState = _.get(newTask, `_new_state`, ``);
newState = newState || spec.finished_state;
if (!updates || !newState) {
console.log(`Exiting task - ${taskID}`);
return ref.set(null).then(resolve, reject);
}
updates._state = newState || spec.finished_state;
updates._state_changed = SERVER_TIMESTAMP;
updates._owner = null;
updates._progress = null;
updates._error_details = null;
console.log(`Resolving`);
return ref.update(updates).then(resolve, reject);
};
const taskReject = error => {
const updates = {};
let errorString;
if (_.isError(error)) {
errorString = error.message;
} else if (_.isString(error)) {
errorString = error;
} else if (!_.isUndefined(error) && !_.isNull(error)) {
errorString = error.toString();
}
if (updates._state === `error`) {
console.log(`Exiting task on reject - ${taskID}`);
return ref.set(null).then(resolve, reject);
}
updates._state = spec.error_state || `error`;
updates._state_changed = SERVER_TIMESTAMP;
updates._owner = null;
updates._progress = null;
updates._error_details = {
previous_state: spec.in_progress_state,
error: errorString,
errorStack: _.get(error, `stack`, null),
attempts: 1
};
console.log(`Rejecting`);
return ref.update(updates).then(resolve, reject);
};
const taskProgress = () => {
// eslint-disable-line
// progress is never used, thus just resolving
const updates = {};
console.log(`Progress ????`);
return ref.update(updates).then(resolve, reject);
};
worker(data, taskProgress, taskResolve, taskReject);
});
};
exports.taskRunner = functions.database
.ref(`/queue/tasks/{taskID}`)
.onWrite(taskHandler);