0

我正在使用迁移来根据用户更新消息的位置。我的意思是我会查询从数据库中获取用户,所以从用户那里,我得到消息并为他们更新位置。我使用序列来增加消息的位置

版本 1:请参阅下面的代码

"use strict";
const async = require('async');

exports.name = "add position for message";
exports.up = function (db) {
  return db.select('uuid').from('User').all().then(function(users) {
    return users.forEach(function(user) {
      console.log('each user', user.uuid);
      db.query(`CREATE SEQUENCE positionMessage${user.uuid} TYPE ORDERED`); // SQL CREATE SEQUENCE

      return db.query(`select uuid from Message where in('has_message').uuid = :userUuid`, {
        params: {
          userUuid: user.uuid
        }
      })
      .then(function(messages) {
        console.log('messages', messages.length); // result => get full messages with 2500 messages.
        return messages.forEach(function(message) {
          console.log('message uuid', message.uuid);
          return db.query(`UPDATE Message SET position = sequence('positionMessage${user.uuid}').next() where uuid = :uuid`, { 
            params: {
              uuid: message.uuid
            }
          })
          .then(function(err) {
            console.log(err);
          })
        })
      })
    })
  })
};

exports.down = function (db) {
  db.select('uuid').from('User').all().then(function(users) {
    users.forEach(function(user) {
      db.query(`DROP SEQUENCE positionMessage${user.uuid}`); // SQL DROP SEQUENCE
    })
  })
  return db.query(`UPDATE Message REMOVE position`)
};

==> 结果:完成了一些消息已更新。计数约为 5-> 50 条记录已更新。(DB 中的消息总数:2500)

我尝试过使用异步瀑布,每个...请参阅我的代码低于版本 2:

"use strict";
const async = require('async');

exports.name = "add position for message";
exports.up = function (db) {

  return async.waterfall([
    function getUser(callback) {
      return db.select('uuid').from('User').all().then(function(users) 
      {
         callback(null, users);
      })
  },
  function getMessageForUser(users, callback) {
    //getMessageForUser
    return async.each(users, function (user, callback) {
      // run to async each users
      db.query(`CREATE SEQUENCE positionMessage${user.uuid} TYPE ORDERED`); // SQL CREATE SEQUENCE
      return db.query(`select uuid from Message where in('has_message').uuid = :userUuid`, {
        params:{
          userUuid: user.uuid
        }
      })
      .then(function(messages) {
        return async.each(messages, function(message, callback) {
          // go to async each messages
          return db.query(`UPDATE Message SET position = sequence('positionMessage').next() where uuid = ${message.uuid}`)
            .then( function (response) {
              // updated position for message
              callback();
            })
        }, function doneEachMessage(err) {
          //doneEachMessage
          callback(err);
        })
      })
    }, function doneEachUser(err) {
      // doneEachUser
      callback(err);
      })
    }
  ], function doneWaterFall(err) {
    console.log('doneWaterFall');
  })
};

exports.down = function (db) {
  db.select('uuid').from('User').all().then(function(users) {
    users.forEach(function(user) {
      db.query(`DROP SEQUENCE positionMessage${user.uuid}`); // SQL DROP SEQUENCE
    })
  })
  return db.query(`UPDATE Message REMOVE position`)
};

结果:=> 什么也没发生。

我使用的 orientjs 版本是 2.2.6

我不知道我做错了什么。任何人都可以帮我解决问题吗?非常感谢!

4

1 回答 1

0

我通过使用 Promise 功能解决了我的问题。这里是:

"use strict";
const async = require('async');

exports.name = "add position for message";
exports.up = function (db) {
  return new Promise(function (resolve, reject) {
    db.select('uuid').from('User').all().then(function(users) {
      async.each(users, function(user, callback) {
        console.log('VP - loop to update message for user has uuid: ', user.uuid);
        db.query(`CREATE SEQUENCE positionMessage${user.uuid} TYPE ORDERED`); // SQL CREATE SEQUENCE
        db.query(`select uuid, @rid as rid from Message where in('has_message').uuid = :userUuid`, {
          params: {
            userUuid: user.uuid
          }
        })
        .then(function(messages) {
          console.log('VP - count messages need to update position: ', messages.length);
          async.each(messages, function(message, callback) {
            db.query(`UPDATE Message SET position = sequence('positionMessage${user.uuid}').next() where uuid = :uuid`, { 
              params: {
                uuid: message.uuid
              }
            })
            .then(function(err) {
              console.log('VP - Executed update position for message have uuid: ', message.uuid);
              callback(err);
            })
          }, function doneEachMessage(err) {
            callback(err);
          })
        })
      }, function doneEachuser(err) {
        resolve('Success!'); // resolved the promise
      })
    })
  });
};

exports.down = function (db) {
  db.select('uuid').from('User').all().then(function(users) {
    users.forEach(function(user) {
      db.query(`DROP SEQUENCE positionMessage${user.uuid}`); // SQL DROP SEQUENCE
    })
  })
  db.query(`UPDATE Message REMOVE position`)
};

现在,等待所有查询的脚本在完成迁移之前运行。

于 2017-10-08T10:29:52.223 回答