1

我只是愚蠢,但我无法弄清楚这一点。我有一个快速服务器,我使用的是默认提供的集群模块(在 6.3.1 上)。我正在使用 apache benchmark 2.4 进行测试:ab -n 50000 -c 10 localhost:3000/api/v1/genders

这就是这种情况,当有人更改配置并发送 akill -SIGHUP <master_pid>时,拦截它,杀死旧工人并重新生成新工人而不关闭服务器。

所以,我有两个版本的代码,第一个

我循环遍历一组工作人员,disconnect每个工作人员和 on disconnectkill工作人员,在kill一个新进程被分叉上。但问题是在某个时间点,没有与服务器的连接,所以我收到一个连接被拒绝错误(61)并且ab死了。

此外,我读到它worker.disconnect并不关心client连接。

在第二个过程中:

我生成了新的叉子,然后一个一个地杀死了旧的叉子,而我没有杀死disconnect,我通过worker.send('shutdown')然后在我做的工人中process.on('message' ...),在这里,我在回调中做 aserver.close和 a process.exit.. 但是现在ab存在说连接由对等方重置。我不明白的是 server.close 不应该处理关闭之前发生的传入连接吗?

所以,

解决这个问题的确切方法是什么,我在哪里做错了?

这里的两个代码,按顺序(相关位):

第一的

#!/usr/bin/env node
'use strict'

const cluster = require('cluster')
const numCPUs = require('os').cpus().length
const debug = require('debug')('api:www')

let clusterStore = {
  __store: [],
  insert: function(id) {
    this.__store.push(id)
    return this
  },
  get: function(id) {
    return this.__store.filter(s => s !== id)
  },
  update: function(prevId, newId) {
    let index = this.__store.indexOf(prevId);

    if(index > -1) delete this.__store[index]
    this.__store.push(newId);

    return this
  }
}

if (cluster.isMaster) {
  // fork workers
  debug('master process (%s) ', process.pid);

  for (let i = 0; i < numCPUs; i++) {
    let fork = cluster.fork()
    clusterStore.insert(fork.id)
  }

  // if a worker dies, log it to the console and start another worker
  cluster.on('exit', (worker, code, signal) => {
    debug('worker (%s) died', worker.process.pid)

    let fork = cluster.fork()
    clusterStore.update(worker.id, fork.id)
  })

  // log when a worker starts listening
  cluster.on('listening', (worker, address) => {
    debug('worker (%s) started', worker.process.pid)
  })

  process.on('SIGHUP', function() {
    debug("respawning forks");

    clusterStore.get().forEach(function(id) {
      if(cluster.workers[id]) process.nextTick(() => { cluster.workers[id].disconnect() })
    })
  })

} else {
  worker()
}

function worker () {
  const app = require('../app')
  const http = require('http')

  const port = normalizePort(process.env.PORT || '3000')
  app.set('port', port)

  const server = http.createServer(app)

  server.on('close', (err, data) => {
    debug("Disconnecting server")

    if(err) debug("Error occurs on server close (%s)", err.message)

    debug("Disconnected server")
    process.exit()
  })

  cluster.worker.on('disconnect', () => {
    debug("Disconnected child process (%s)", cluster.worker.process.pid)
    cluster.worker.kill()
  });

  server.listen(port)
  server.on('error', onError)
  server.on('listening', onListening)
}

第二:(更改位)

if (cluster.isMaster) {
  // fork workers
  debug('master process (%s) ', process.pid);

  for (let i = 0; i < numCPUs; i++) {
    let fork = cluster.fork()
    clusterStore.insert(fork.id)
  }

  // if a worker dies, log it to the console and start another worker
  cluster.on('exit', (worker, code, signal) => {
    debug('worker (%s) died', worker.process.pid)

    if(worker.exitedAfterDisconnect !== true) {
      let fork = cluster.fork()
      clusterStore.update(worker.id, fork.id)
    }
  })

  // log when a worker starts listening
  cluster.on('listening', (worker, address) => {
    debug('worker (%s) started', worker.process.pid)
  })

  process.on('SIGHUP', function() {
    debug("respawning forks");

    let oldClusters = clusterStore.get().slice()

    oldClusters.forEach(function(id) {
      let worker = cluster.workers[id]
      let timer;

      if(worker) {
        let fork = cluster.fork()
        clusterStore.update(worker.id, fork.id)
      }
    })

    oldClusters.forEach(id => {
      cluster.workers[id].send('shutdown')
    })
  })

} else {
  worker()
}

function worker () {
  const app = require('../app')
  const http = require('http')
  const port = normalizePort(process.env.PORT || '3000')
  app.set('port', port)

  const server = http.createServer(app)

  process.on('message', (msg) => {
    if(msg ==='shutdown') {
      server.close(err => {
        if(err) debug("Error encountered while closing server (%s)", err.message)
        process.exit()
      })
    }
  })

  server.listen(port)
  server.on('error', onError)
  server.on('listening', onListening)
}
4

0 回答 0