3

我试图通过从保存我所有数据的常规数据结构切换到使用 redis 和集群来扩展我的 socket.io 应用程序。但是,我遇到了一些问题,因为在当前实现的某个时刻,我将套接字对象与其他属性一起存储在此数据结构data[socket.id].socket = socket中,因为在我的应用程序中,有时我需要data[someId].socket.disconnect()手动断开套接字。

我知道我不能将对象直接存储到redis中,所以我尝试使用JSON.stringify(socket)没有成功,因为socket它是循环的。是否有另一种方法可以仅使用 断开套接字id?这样我就可以存储id这样的东西data[socket.id].id = socket.id,也许可以称之为类似的data[someId].id.disconnect()东西。所以基本上我正在寻找一种方法来断开套接字而无需访问实际的套接字对象(我确实可以访问该io对象)。

谢谢大家的帮助。

4

4 回答 4

4

似乎这已经完成但没有记录在任何地方......io.sockets.clients(someId)无论在哪个实例上调用它都获取套接字对象,因此唯一需要的是使用io.sockets.clients(someId).disconnect()并实际断开客户端,无论它连接的实例如何至。我实际上不需要将它们存储在我自己的阵列上。

于 2012-11-22T19:45:42.107 回答
1

在 nodejs、 pm2socket.io-redis前面使用nginx

NGINX.conf

    server {
        server_name                         www.yoursite.io;
        listen                              443 ssl http2;
        listen                              [::]:443 ssl http2;

        location / {
            proxy_set_header                X-Real-IP $remote_addr;
            proxy_set_header                X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header                Host $http_host;
            proxy_set_header                X-NginX-Proxy false;
            proxy_redirect                  off;
            proxy_http_version              1.1;
            proxy_set_header                Upgrade $http_upgrade;
            proxy_set_header                Connection "upgrade";
            proxy_pass                      http://[::1]:4000;
        }
   }

PM2 运行集群模式,四个实例...

pm2 start app.js -i 4

应用程序.js

console.clear()
require('dotenv').config()
const express = require('express'),
  app = express(),
  Redis = require('ioredis')
if(process.env.debug === 'true')
  app.use(require('morgan')(':method :url :status :res[content-length] - :response-time ms'))
app.locals = Object.assign(app.locals, {
  sock_transports: ['websocket', 'xhr-polling'],
  sock_timeout: process.env.sock_timeout,
  title: process.env.title,
  meta_desc: process.env.meta_desc,
  app_url: ['https://', process.env.app_subdomain, '.', process.env.app_domain].join('')
})
app.set('functions', require('./lib/functions')(app))
app.set('view engine', 'hbs')
app.set('view cache', false)
app.engine('hbs', require('./lib/hbs')(require('express-handlebars')).engine)
app.use(express.json({
  type: [
    'json'
  ]
}), express.urlencoded({
  extended: true
}))
const redis = new Redis({
  path: process.env.redis_socket,
  db: 1,
  enableReadyCheck: true
})
console.time('Redis')
redis.on('ready', () => {
  console.timeEnd('Redis')
  app.set('redis', redis)
})
redis.on('error', err => {
  console.log('Redis: ' + app.get('colors').redBright(err))
  exitHandler()
})
function loadSessionMiddleware() {
  const session = require('express-session'),
    RedisSession = require('connect-redis')(session),
    client = new Redis({
      path: process.env.redis_socket,
      db: 5
    }),
    ua = require('useragent')
  ua(true)
  app.set('useragent', ua)
  app.set('session_vars', {
    secret: process.env.session_secret,
    name: process.env.session_name,
    store: new RedisSession({
      client
    }),
    rolling: true,
    saveUninitialized: true,
    unset: 'destroy',
    resave: true,
    proxy: true,
    logErrors: process.env.debug === 'true',
    cookie: {
      path: '/',
      domain: '.' + process.env.app_domain,
      secure: true,
      sameSite: true,
      httpOnly: true,
      expires: false,
      maxAge: 60000 * process.env.session_exp_mins,
    }
  })
  app.set('session', session(app.get('session_vars')))
  app.use(
    app.get('session'),
    require('./middleware')(app)
  )
  loadControllers()
}
function loadControllers() {
  require('fs').readdirSync('./controllers').filter(file => {
    return file.slice(-3) === '.js'
  }).forEach(file => {
    require('./controllers/' + file)(app)
  })
  app.get('*', (req, res) => {
    app.get('functions').show404(req, res)
  })
  initServer()
}
function initServer() {
  console.time('Server')
  const server = require('http').createServer(app)
  server.on('error', err => {
    console.err('express err: ' + err)
    app.get('functions').stringify(err)
  })
  server.listen(process.env.app_port)
  app.set('server', server)
  require('./websocket').listen(app, websocket => {
    console.timeEnd('Server')
    app.set('websocket', websocket)
    // www-data
    process.setuid(process.env.app_uid)
  })
}
console.time('Database')
require('./model').load(app, db => {
  console.timeEnd('Database')
  app.set('model', db)
  loadSessionMiddleware()
})
function exitHandler() {
  if(app.get('server'))
    app.get('server').close()
  if(app.get('redis'))
    app.get('redis').quit()
  if(app.get('mail'))
    app.get('mail').close()
  process.exit(0)
}
process.on('SIGINT SIGUSR1 SIGUSR2', () => {
  exitHandler()
})
process.stdin.resume()

websocket.js

var exports = {}
exports.listen = (app, cb) => {
  const websocket = require('socket.io')(app.get('server'), {
    transports: process.env.transports
  }),
  req = {}
  websocket.setMaxListeners(0)
  websocket.adapter(require('socket.io-redis')({
    path: process.env.redis_socket,
    key: 'socket_io',
    db: 2,
    enableOfflineQueue: true
  }))
  websocket.use((socket, next) => {
    app.get('session')(socket.request, socket.request.res || {}, next)
  })
  websocket.isAccountLocked = cb => {
    if(!req.session.user_id) {
      cb(false)
      return
    }
    if(isNaN(req.session.user_id)) {
      cb(false)
      return
    }
    app.get('model').users.get(req.session.user_id, user_rec => {
      if(!user_rec) {
        cb(false)
        return
      }
      if(user_rec.account_locked === 'yes') {
        websocket.showClient(client => {
          app.get('model').users.logout(req.session, () => {
            console.log(client + ' ' + app.get('colors').redBright('Account Locked'))
            cb(true)
          })
        })
        return
      }
      cb(false)
    })
  }
  websocket.showClient = cb => {
    var outp = []
    if(!req.session.user_id && !req.session.full_name)
      outp.push(req.session.ip)
    if(req.session.user_id) {
      outp.push('# ' + req.session.user_id)
      if(req.session.full_name)
        outp.push(' - ' + req.session.full_name)
    }
    cb(app.get('colors').black.bgWhite(outp.join('')))
  }
  websocket.on('connection', socket => {
    if(!socket.request.session)
      return
    req.session = socket.request.session
    socket.use((packet, next) => {
        websocket.isAccountLocked(locked => {
            if(locked)
                return
            var save_sess = false
            if(typeof(socket.handshake.headers['x-real-ip']) !== 'undefined') {
          if(socket.handshake.headers['x-real-ip'] !== req.session.ip) {
            req.session.ip = socket.handshake.headers['x-real-ip']
            save_sess = true
          }
        }
        var ua = app.get('useragent').parse(socket.handshake.headers['user-agent']).toString()
        if(ua !== req.session.useragent) {
          req.session.useragent = ua
          save_sess = true
        }
            websocket.of('/').adapter.remoteJoin(socket.id, req.session.id, err => {
                delete socket.rooms[socket.id]
                if(!save_sess) {
            next()
            return
          }
                req.session.save(() => {
                  next()
                })
            })
      })
    })
    socket.on('disconnecting', () => {
      websocket.of('/').adapter.remoteDisconnect(req.session.id, true, err => {
      })
    })
    socket.on('auth', sess_vars => {
      function setSess() {
        if(sess_vars.path)
          req.session.path = sess_vars.path
        if(sess_vars.search_query)
          req.session.search_query = sess_vars.search_query
        if(sess_vars.search_query_long)
          req.session.search_query_long = sess_vars.search_query_long
        if(sess_vars.dispensary_id)
          req.session.dispensary_id = sess_vars.dispensary_id
        if(sess_vars.city)
          req.session.city = sess_vars.city
        if(sess_vars.state)
          req.session.state = sess_vars.state
        if(sess_vars.zip)
          req.session.zip = sess_vars.zip
        if(sess_vars.country)
          req.session.country = sess_vars.country
        if(sess_vars.hash)
          req.session.hash = sess_vars.hash
        req.session.save(() => {
          websocket.to(req.session.id).emit('auth', sess)
          app.get('functions').showVisitor({
            session: sess
          }, {
            statusCode: 200
          })
        })
      }
      setSess()
    })
    socket.on('logout', () => {
      var sess_ip = req.session.ip,
        sess_id = req.session.id,
        sess_email = req.session.email
      app.get('model').users.logout(req.session, () => {
        websocket.showClient(client => {
          console.log(client + ' - Logged Out')
        })
      })
    })
  })
  cb(websocket)
}
module.exports = exports
于 2019-11-20T02:58:11.710 回答
0

我做类似的事情是这样的:

var connTable = {};
function onConnection(socket) {
  connTable[socket.id] = socket;

  socket.on("close", function(data, callback) {
    socket.disconnect();
    onDisconnect(socket.id);
  });


  socket.on('disconnect', function(){
    onDisconnect(socket.id);
  });
}

function manuallyDisconnect(socket_id) {
  connTable[socket_id].disconnect();
}

function onDisconnect() {
  //closing process, or something....
}
于 2012-11-21T17:35:38.630 回答
0

从 socket.io-redis 的文档中,您需要使用remoteDisconnect

io.of('/').adapter.remoteDisconnect(socketId, true, (err) => {
            if (err) {
              console.log('remoteDisconnect err:', err);
            }
        });
于 2020-03-12T22:53:26.737 回答