21

ESOCKETTIMEDOUT我正在使用 Node.js - 异步和请求模块来抓取 100+ 数百万个网站,ETIMEDOUT但几分钟后我一直遇到错误。

重新启动脚本后它再次起作用。这似乎不是连接限制问题,因为我仍然可以做 resolve4、resolveNs、resolveMx 并且也curl没有延迟。

你看到代码有什么问题吗?或任何建议?我想将 async.queue() 并发量提高到至少 1000。谢谢。

var request = require('request'),
    async = require('async'),
    mysql = require('mysql'),
    dns = require('dns'),
    url = require('url'),
    cheerio = require('cheerio'),
    iconv = require('iconv-lite'),
    charset = require('charset'),
    config = require('./spy.config'),
    pool = mysql.createPool(config.db);

iconv.skipDecodeWarning = true;

var queue = async.queue(function (task, cb) {
    dns.resolve4('www.' + task.domain, function (err, addresses) {
        if (err) {
            //
            // Do something
            //
            setImmediate(function () {
                cb()
            });
        } else {
            request({
                url: 'http://www.' + task.domain,
                method: 'GET',
                encoding:       'binary',
                followRedirect: true,
                pool:           false,
                pool:           { maxSockets: 1000 },
                timeout:        15000 // 15 sec
            }, function (error, response, body) {

                //console.info(task);

                if (!error) {
                  // If ok, do something

                } else {
                    // If not ok, do these

                    console.log(error);

                    // It keeps erroring here after few minutes, resolve4, resolveNs, resolveMx still work here.

                    // { [Error: ETIMEDOUT] code: 'ETIMEDOUT' }
                    // { [Error: ESOCKETTIMEDOUT] code: 'ESOCKETTIMEDOUT' }

                    var ns = [],
                        ip = [],
                        mx = [];
                    async.parallel([
                        function (callback) {
                            // Resolves the domain's name server records
                            dns.resolveNs(task.domain, function (err, addresses) {
                                if (!err) {
                                    ns = addresses;
                                }
                                callback();
                            });
                        }, function (callback) {
                            // Resolves the domain's IPV4 addresses
                            dns.resolve4(task.domain, function (err, addresses) {
                                if (!err) {
                                    ip = addresses;
                                }
                                callback();
                            });
                        }, function (callback) {
                            // Resolves the domain's MX records
                            dns.resolveMx(task.domain, function (err, addresses) {
                                if (!err) {
                                    addresses.forEach(function (a) {
                                        mx.push(a.exchange);
                                    });
                                }
                                callback();
                            });
                        }
                    ], function (err) {
                        if (err) return next(err);

                        // do something
                    });

                }
                setImmediate(function () {
                    cb()
                });
            });
        }
    });
}, 200);

// When the queue is emptied we want to check if we're done
queue.drain = function () {
    setImmediate(function () {
        checkDone()
    });
};
function consoleLog(msg) {
    //console.info(msg);
}
function checkDone() {
    if (queue.length() == 0) {
        setImmediate(function () {
            crawlQueue()
        });
    } else {
        console.log("checkDone() not zero");
    }
}

function query(sql) {
    pool.getConnection(function (err, connection) {
        if (!err) {
            //console.log(sql);
            connection.query(sql, function (err, results) {
                connection.release();
            });
        }
    });
}

function crawlQueue() {
    pool.getConnection(function (err, connection) {
        if (!err) {
            var sql = "SELECT * FROM domain last_update < (UNIX_TIMESTAMP() - 2592000) LIMIT 500";
            connection.query(sql, function (err, results) {
                if (!err) {
                    if (results.length) {
                        for (var i = 0, len = results.length; i < len; ++i) {
                            queue.push({"id": results[i]['id'], "domain": results[i]['domain'] });
                        }
                    } else {
                        process.exit();
                    }
                    connection.release();
                } else {
                    connection.release();
                    setImmediate(function () {
                        crawlQueue()
                    });
                }
            });
        } else {
            setImmediate(function () {
                crawlQueue()
            });
        }
    });
}
setImmediate(function () {
    crawlQueue()
});

而且系统限制非常高。

    Limit                     Soft Limit           Hard Limit           Units
    Max cpu time              unlimited            unlimited            seconds
    Max file size             unlimited            unlimited            bytes
    Max data size             unlimited            unlimited            bytes
    Max stack size            8388608              unlimited            bytes
    Max core file size        0                    unlimited            bytes
    Max resident set          unlimited            unlimited            bytes
    Max processes             257645               257645               processes
    Max open files            500000               500000               files
    Max locked memory         65536                65536                bytes
    Max address space         unlimited            unlimited            bytes
    Max file locks            unlimited            unlimited            locks
    Max pending signals       257645               257645               signals
    Max msgqueue size         819200               819200               bytes
    Max nice priority         0                    0
    Max realtime priority     0                    0
    Max realtime timeout      unlimited            unlimited            us

系统控制

net.ipv4.ip_local_port_range = 10000    61000
4

4 回答 4

22

默认情况下,Node 有4 个 worker 来解析 DNS 查询。如果您的 DNS 查询需要很长时间,请求将在 DNS 阶段阻塞,并且症状正好是ESOCKETTIMEDOUTor ETIMEDOUT

尝试增加你的 uv 线程池大小:

export UV_THREADPOOL_SIZE=128
node ...

或在index.js(或您的入口点所在的任何地方):

#!/usr/bin/env node
process.env.UV_THREADPOOL_SIZE = 128;

function main() {
   ...
}

编辑:我还写了关于它的博客文章

于 2016-06-21T13:49:54.990 回答
10

10/31/2017 我们找到的最终解决方案是在代理中使用 keepAlive 选项。例如:

var pool = new https.Agent({ keepAlive: true });

function getJsonOptions(_url) {
    return {
        url: _url,
        method: 'GET',
        agent: pool,
        json: true
    };
}

Node 的默认池似乎默认为 keepAlive=false,这会导致在每个请求上创建一个新连接。当短时间内创建太多连接时,上述错误就会浮出水面。我的猜测是,服务路径上的一个或多个路由器会阻止连接请求,可能是怀疑拒绝服务攻击。无论如何,上面的代码示例完全解决了我们的问题。

7/16/2021 这个问题有一个更简单的解决方案:

    var http = require('http');
    http.globalAgent.keepAlive = true;

    var https = require('https');
    https.globalAgent.keepAlive = true;

我在代码中验证了全局代理的 keepAlive 设置为 false,尽管文档说默认值应该为 true。

于 2017-05-08T21:51:44.127 回答
2

TL:博士;request在循环之前只用你的设置配置一个包装器,然后使用forever: true设置。

const customRequest = request.defaults({ forever: true }); // wrapper
customRequest({ uri: uri });

详细解答;
在循环内执行请求时,我遇到了完全相同的问题。但是,以上都没有对我有用。

为了在一定时间后停止获取ETIMEDOUT和处理某些请求,请执行以下操作:ESOCKETTIMEDOUT

  1. 不要request为循环中的每个请求配置设置。相反,request只在循环前一次使用您的设置创建一个包装器。正如请求文件所述:

请注意,如果您在循环中发送多个请求并创建多个新池对象,则 maxSockets 将无法按预期工作。要解决此问题,请使用 request.defaults 与您的池选项或使用循环外的 maxSockets 属性创建池对象。

  1. 但是,即使在实现这一点并使用 配置池时{ maxSockets: Infinity },我仍然面临错误。解决我的问题的唯一配置forever: true. 这将使已建立的连接保持活动状态。

因此,最后我的代码是这样的:

const request = require('request');
// Custom wrapper
const customRequest = request.defaults({
   forever: true,
   timeout: 20000,
   encoding: null
})

loop(urls, (url) => { // for each of my urls
   customRequest({uri: url}, (err, res, body) => { 
      console.log("done"); 
   });
});

使用这种策略,我能够以每秒 25 个请求的速度执行大约 400K 请求,而在此过程中没有任何问题(Ubuntu 18.04 VM,4GB RAM,默认为 UV_THREADPOOL_SIZE)。

于 2020-07-30T19:17:32.060 回答
-1

在请求工具中(https://github.com/request/request

http连接keep-alive默认是关闭的。

您需要设置 option.forever = true 才能打开此功能。

于 2019-03-10T01:48:24.330 回答