2

多年来,我一直在使用firebase-queue调用作业多年,但最近它开始因事务错误竞争资源而失败,并且存在严重的扩展问题。

所以我迁移firebase-queue到云功能,最初它似乎很快并且非常可扩展。我编写了一个由 onWrite on 连接的简单脚本queue/task/{taskId}。但似乎有时 onWrite 永远不会随机触发。我根本看不到日志中触发的特定作业。

我们正在使用 blaze 计划,函数返回 promise,timelimit 设置为最大值,99.9% 的作业耗时小于 5000ms。

我们每天有大约 5 万次调用,大约 100 个工作没有被触发。这是断断续续的,对此一无所知。一件奇怪的事情是我在日志查看器上看不到任何错误,但在调用图上,我可以看到一些加载错误和连接错误,但仍然不知道它们是什么。任何解决该问题的见解将不胜感激。

const functions = require(`firebase-functions`);
const admin = require(`firebase-admin`);
const _ = require(`lodash`);
const specs = require(`./specs.json`);
const queueSpecs = require(`./queue-specs.js`);

const SERVER_TIMESTAMP = { ".sv": `timestamp` };

admin.initializeApp();

const workers = {};

queueSpecs.forEach((spec) => {
    let specString = spec;

    if (typeof spec === `object`) {
        specString = spec.id;
    }

    workers[specString] = require(`./lib/workers/${specString}`);
});

const taskHandler = (change, context) => {
    const data = change.after.val();
    const ref = change.after.ref;
    const { taskID } = context.params;

    if (!data) {
        return null;
    }

    const worker = workers[data._state];

    if (!worker) {
        console.error(`Worker not found for task ${taskID} - ${data._state}`);
        return null;
    }

    const specID = data._state.replace(`/`, `_`);
    const spec = specs[specID];

    if (!spec) {
        console.error(`Spec not found for task ${taskID} - ${specID}`);
        return null;
    }

    console.log(
        `Running ${taskID} - ${data._state} - ${JSON.stringify(data, null, 2)}`
    );

    return new Promise((resolve, reject) => {
        const taskResolve = newTask => {
            const updates = _.clone(newTask);
            let newState = _.get(newTask, `_new_state`, ``);

            newState = newState || spec.finished_state;

            if (!updates || !newState) {
                console.log(`Exiting task - ${taskID}`);
                return ref.set(null).then(resolve, reject);
            }

            updates._state = newState || spec.finished_state;
            updates._state_changed = SERVER_TIMESTAMP;
            updates._owner = null;
            updates._progress = null;
            updates._error_details = null;

            console.log(`Resolving`);

            return ref.update(updates).then(resolve, reject);
        };

        const taskReject = error => {
            const updates = {};
            let errorString;

            if (_.isError(error)) {
                errorString = error.message;
            } else if (_.isString(error)) {
                errorString = error;
            } else if (!_.isUndefined(error) && !_.isNull(error)) {
                errorString = error.toString();
            }

            if (updates._state === `error`) {
                console.log(`Exiting task on reject - ${taskID}`);
                return ref.set(null).then(resolve, reject);
            }

            updates._state = spec.error_state || `error`;
            updates._state_changed = SERVER_TIMESTAMP;
            updates._owner = null;
            updates._progress = null;
            updates._error_details = {
                previous_state: spec.in_progress_state,
                error: errorString,
                errorStack: _.get(error, `stack`, null),
                attempts: 1
            };

            console.log(`Rejecting`);
            return ref.update(updates).then(resolve, reject);
        };

        const taskProgress = () => {
            // eslint-disable-line
            // progress is never used, thus just resolving
            const updates = {};

            console.log(`Progress ????`);

            return ref.update(updates).then(resolve, reject);
        };

        worker(data, taskProgress, taskResolve, taskReject);
    });
};

exports.taskRunner = functions.database
    .ref(`/queue/tasks/{taskID}`)
    .onWrite(taskHandler);
4

0 回答 0