10

我刚开始使用 node-postgres 和 postgres 一起玩 node.js。我尝试做的一件事是编写一个简短的 js 来填充我的数据库,使用一个包含大约 200,000 个条目的文件。

我注意到一段时间后(不到 10 秒),我开始收到“错误:连接终止”。我不确定这是否与我使用 node-postgres 的方式有关,或者是因为我向 postgres 发送垃圾邮件。

无论如何,这是一个显示此行为的简单代码:

var pg = require('pg');
var connectionString = "postgres://xxxx:xxxx@localhost/xxxx";

pg.connect(connectionString, function(err,client,done){
  if(err) {
    return console.error('could not connect to postgres', err);
  }

  client.query("DROP TABLE IF EXISTS testDB");
  client.query("CREATE TABLE IF NOT EXISTS testDB (id int, first int, second int)");
  done();

  for (i = 0; i < 1000000; i++){
    client.query("INSERT INTO testDB VALUES (" + i.toString() + "," + (1000000-i).toString() + "," + (-i).toString() + ")",   function(err,result){
      if (err) {
         return console.error('Error inserting query', err);
      }
      done();
    });
  }
});

它在大约 18,000-20,000 次查询后失败。这是使用client.query的错误方式吗?我尝试更改默认客户端编号,但似乎没有帮助。

client.connect() 似乎也没有帮助,但那是因为我有太多的客户,所以我绝对认为客户池是要走的路。

谢谢你的帮助!

4

2 回答 2

17

更新

此答案已被本文取代:数据导入,它代表了最新的方法。


为了复制您的场景,我使用了pg-promise库,并且我可以确认,无论您使用哪个库,正面尝试都不会奏效,重要的是方法。

下面是一种改进的方法,我们将插入划分为块,然后在事务中执行每个块,这就是负载平衡(又名节流):

function insertRecords(N) {
    return db.tx(function (ctx) {
        var queries = [];
        for (var i = 1; i <= N; i++) {
            queries.push(ctx.none('insert into test(name) values($1)', 'name-' + i));
        }
        return promise.all(queries);
    });
}
function insertAll(idx) {
    if (!idx) {
        idx = 0;
    }
    return insertRecords(100000)
        .then(function () {
            if (idx >= 9) {
                return promise.resolve('SUCCESS');
            } else {
                return insertAll(++idx);
            }
        }, function (reason) {
            return promise.reject(reason);
        });
}
insertAll()
    .then(function (data) {
        console.log(data);
    }, function (reason) {
        console.log(reason);
    })
    .done(function () {
        pgp.end();
    });

这在大约 4 分钟内产生了 1000,000 条记录,在前 3 笔交易之后显着放缓。我使用的是 Node JS 0.10.38(64 位),它消耗了大约 340MB 的内存。这样我们插入了 100,000 条记录,连续 10 次。

如果我们这样做,只是这次在 100 个事务中插入 10,000 条记录,同样的 1,000,000 条记录在 1 分 25 秒内添加,没有减速,Node JS 消耗大约 100MB 内存,这告诉我们这样的分区数据非常好主意。

无论您使用哪个库,方法都应该相同:

  1. 将您的插入分区/限制到多个事务中;
  2. 将单个事务中的插入列表保持在大约 10,000 条记录;
  3. 在同步链中执行所有交易。
  4. 在每个事务的 COMMIT 之后将连接释放回池。

如果您违反任何这些规则,您肯定会遇到麻烦。例如,如果您违反规则 3,您的 Node JS 进程可能会很快耗尽内存并引发错误。我的示例中的规则 4 由图书馆提供。

如果你遵循这种模式,你就不需要为连接池设置而烦恼。

更新 1

更高版本的pg-promise完美支持这样的场景,如下图:

function factory(index) {
    if (index < 1000000) {
        return this.query('insert into test(name) values($1)', 'name-' + index);
    }
}

db.tx(function () {
    return this.batch([
        this.none('drop table if exists test'),
        this.none('create table test(id serial, name text)'),
        this.sequence(factory), // key method
        this.one('select count(*) from test')
    ]);
})
    .then(function (data) {
        console.log("COUNT:", data[3].count);
    })
    .catch(function (error) {
        console.log("ERROR:", error);
    });

如果你不想包含任何额外的东西,比如创建表,那么它看起来更简单:

function factory(index) {
    if (index < 1000000) {
        return this.query('insert into test(name) values($1)', 'name-' + index);
    }
}

db.tx(function () {
    return this.sequence(factory);
})
    .then(function (data) {
        // success;
    })
    .catch(function (error) {
        // error;
    });

有关详细信息,请参阅同步事务

Bluebird例如,用作 promise 库时,在我的生产机器上插入 1,000,000 条记录需要 1 分 43 秒(未启用长堆栈跟踪)。

您只需factory根据 让您的方法返回请求index,直到您没有剩余,就这么简单。

最好的部分是,这不仅速度快,而且对您的 NodeJS 进程几乎没有负载。整个测试过程中内存测试过程保持在 60MB 以下,仅消耗 CPU 时间的 7-8%。

更新 2

从 1.7.2 版本开始,pg-promise轻松支持超大规模事务。请参阅同步事务一章。

例如,在我的家用 PC 上,我可以在 15 分钟内在单个事务中插入 10,000,000 条记录,使用 Windows 8.1 64 位。

对于测试,我将我的 PC 设置为生产模式,并使用Bluebird作为 Promise 库。在测试期间,整个 NodeJS 0.12.5 进程(64 位)的内存消耗没有超过 75MB,而我的 i7-4770 CPU 显示出一致的 15% 负载。

以同样的方式插入 100m 条记录只需要更多的耐心,但不需要更多的计算机资源。

与此同时,之前对 1m 刀片的测试从 1m43s 下降到 1m31s。

更新 3

以下注意事项可以产生巨大的影响:性能提升

更新 4

相关问题,有一个更好的实现示例: Massive inserts with pg-promise

更新 5

一个更好和更新的例子可以在这里找到:nodeJS inserting Data into PostgreSQL error

于 2015-03-28T23:26:15.467 回答
4

我猜你正在达到最大池大小。由于client.query是异步的,所以在返回之前使用所有可用的连接。

默认池大小为 10。在此处查看:https ://github.com/brianc/node-postgres/blob/master/lib/defaults.js#L27

您可以通过设置增加默认池大小pg.defaults.poolSize

pg.defaults.poolSize = 20;

更新:释放连接后执行另一个查询。

var pg = require('pg');
var connectionString = "postgres://xxxx:xxxx@localhost/xxxx";
var MAX_POOL_SIZE = 25;

pg.defaults.poolSize = MAX_POOL_SIZE;
pg.connect(connectionString, function(err,client,done){
  if(err) {
    return console.error('could not connect to postgres', err);
  }

  var release = function() {
    done();
    i++;
    if(i < 1000000)
      insertQ();
  };

  var insertQ = function() {
    client.query("INSERT INTO testDB VALUES (" + i.toString() + "," + (1000000-i).toString() + "," + (-i).toString() + ")",        function(err,result){
      if (err) {
         return console.error('Error inserting query', err);
      }
      release();
    });
  };

  client.query("DROP TABLE IF EXISTS testDB");
  client.query("CREATE TABLE IF NOT EXISTS testDB (id int, first int,    second int)");
  done();

  for (i = 0; i < MAX_POOL_SIZE; i++){
    insertQ();
  }
});

基本思想是,由于您将大量连接池大小相对较小的查询排入队列,因此您将达到最大池大小。在这里,我们仅在释放现有连接后才进行新查询。

于 2015-03-17T14:00:49.073 回答