92

用过的

NodeJS,Socket.io

问题

想象有 2 个用户U1U2,通过 Socket.io 连接到一个应用程序。算法如下:

  1. U1完全失去互联网连接(例如关闭互联网)
  2. U2向U1发送消息。
  3. U1尚未收到消息,因为 Internet 已关闭
  4. 服务器通过心跳超时检测U1断线
  5. U1重新连接到 socket.io
  6. U1永远不会收到来自U2的消息- 我猜它在第 4 步丢失了。

可能的解释

我想我明白为什么会这样:

  • 在第 4 步上,服务器也终止了套接字实例和发送到U1的消息队列
  • 此外,在第 5 步U1服务器创建新连接(它不被重用),所以即使消息仍在排队,之前的连接无论如何都会丢失。

需要帮忙

如何防止这种数据丢失?我必须使用心跳,因为我不会让人们永远挂在应用程序中。此外,我仍然必须提供重新连接的可能性,因为当我部署新版本的应用程序时,我希望零停机时间。

PS 我称之为“消息”的东西不仅仅是我可以存储在数据库中的文本消息,而是有价值的系统消息,必须保证交付,否则 UI 会搞砸。

谢谢!


加法1

我已经有一个用户帐户系统。而且,我的应用程序已经很复杂了。添加离线/在线状态无济于事,因为我已经有了这种东西。问题是不同的。

查看第 2 步。在这一步中,我们在技术上不能说 U1 是否离线,他只是失去连接让我们说 2 秒钟,可能是因为网络不好。所以 U2 向他发送了一条消息,但 U1 没有收到它,因为他的互联网仍然关闭(第 3 步)。需要第 4 步来检测离线用户,假设超时为 60 秒。最终,再过 10 秒,U1 的互联网连接就建立起来了,他重新连接到了 socket.io。但是来自 U2 的消息在空间中丢失了,因为服务器 U1 因超时而断开连接。

这就是问题所在,我不想 100% 交货。


解决方案

  1. 在 {} 用户中收集发射(发射名称和数据),由随机发射 ID 标识。发送发射
  2. 在客户端确认发射(使用 emitID 将发射发送回服务器)
  3. 如果确认 - 从由 emitID 标识的 {} 中删除对象
  4. 如果用户重新连接 - 为该用户检查 {} 并循环遍历它,为 {} 中的每个对象执行第 1 步
  5. 必要时为用户断开连接或/和连接刷新 {}
// Server
const pendingEmits = {};

socket.on('reconnection', () => resendAllPendingLimits);
socket.on('confirm', (emitID) => { delete(pendingEmits[emitID]); });

// Client
socket.on('something', () => {
    socket.emit('confirm', emitID);
});

解决方案2(有点)

2020 年 2 月 1 日添加。

虽然这并不是 Websockets 的真正解决方案,但有人可能仍然觉得它很方便。我们从 Websockets 迁移到 SSE + Ajax。SSE 允许您从客户端连接以保持持久的 TCP 连接并实时接收来自服务器的消息。要将消息从客户端发送到服务器 - 只需使用 Ajax。有延迟和开销等缺点,但 SSE 保证可靠性,因为它是 TCP 连接。

由于我们使用 Express,因此我们将此库用于 SSE https://github.com/dpskvn/express-sse,但您可以选择适合您的库。

IE 和大多数 Edge 版本不支持 SSE,因此您需要一个 polyfill:https ://github.com/Yaffle/EventSource 。

4

8 回答 8

118

其他人在其他答案和评论中暗示了这一点,但根本问题是 Socket.IO 只是一种交付机制,您不能单独依赖它来实现可靠交付。唯一能确定消息已成功传递给客户端的人是客户端本身。对于这种系统,我建议做出以下断言:

  1. 消息不直接发送给客户端;相反,它们被发送到服务器并存储在某种数据存储中。
  2. 客户端在重新连接时负责询问“我错过了什么”,并将查询数据存储中存储的消息以更新其状态。
  3. 如果在接收客户端连接时向服务器发送消息,则该消息将实时发送到客户端。

当然,根据您的应用程序的需要,您可以调整其中的一部分——例如,您可以使用 Redis 列表或消息排序集,如果您知道客户端已启动,则将其清除迄今为止。


这里有几个例子:

快乐之路

  • U1 和 U2 都连接到系统。
  • U2 向服务器发送一条 U1 应该接收的消息。
  • 服务器将消息存储在某种持久存储中,用某种时间戳或顺序 ID 为 U1 标记它。
  • 服务器通过 Socket.IO 向 U1 发送消息。
  • U1 的客户端确认(可能通过 Socket.IO 回调)它收到了消息。
  • 服务器从数据存储中删除持久消息。

离线路径

  • U1 失去互联网连接。
  • U2 向服务器发送一条 U1 应该接收的消息。
  • 服务器将消息存储在某种持久存储中,用某种时间戳或顺序 ID 为 U1 标记它。
  • 服务器通过 Socket.IO 向 U1 发送消息。
  • U1 的客户端确认收货,因为他们处于离线状态。
  • 也许 U2 会向 U1 发送更多消息;它们都以相同的方式存储在数据存储中。
  • 当 U1 重新连接时,它会询问服务器“我看到的最后一条消息是 X / 我有状态 X,我错过了什么。”
  • 服务器根据 U1 的请求向 U1 发送它从数据存储中遗漏的所有消息
  • U1 的客户端确认收到,服务器从数据存储中删除这些消息。

如果您绝对想要有保证的交付,那么重要的是设计您的系统,使连接实际上并不重要,实时交付只是一个奖励;这几乎总是涉及某种数据存储。正如 user568109 在评论中提到的那样,有一些消息传递系统可以抽象出所述消息的存储和传递,并且可能值得研究这种预构建的解决方案。(您可能仍然需要自己编写 Socket.IO 集成。)

如果您对将消息存储在数据库中不感兴趣,则可以将它们存储在本地数组中;服务器尝试向 U1 发送消息,并将其存储在“待处理消息”列表中,直到 U1 的客户端确认它收到它。如果客户端离线,那么当它回来时,它可以告诉服务器“嘿,我已断开连接,请将我错过的任何内容发送给我”,然后服务器可以遍历这些消息。

幸运的是,Socket.IO 提供了一种机制,允许客户端“响应”类似于原生 JS 回调的消息。这是一些伪代码:

// server
pendingMessagesForSocket = [];

function sendMessage(message) {
  pendingMessagesForSocket.push(message);
  socket.emit('message', message, function() {
    pendingMessagesForSocket.remove(message);
  }
};

socket.on('reconnection', function(lastKnownMessage) {
  // you may want to make sure you resend them in order, or one at a time, etc.
  for (message in pendingMessagesForSocket since lastKnownMessage) {
    socket.emit('message', message, function() {
      pendingMessagesForSocket.remove(message);
    }
  }
});

// client
socket.on('connection', function() {
  if (previouslyConnected) {
    socket.emit('reconnection', lastKnownMessage);
  } else {
    // first connection; any further connections means we disconnected
    previouslyConnected = true;
  }
});

socket.on('message', function(data, callback) {
  // Do something with `data`
  lastKnownMessage = data;
  callback(); // confirm we received the message
});

这与上一个建议非常相似,只是没有持久数据存储。


您可能还对事件溯源的概念感兴趣。

于 2013-12-28T16:46:31.707 回答
4

米歇尔的回答非常中肯,但还有一些其他重要的事情需要考虑。要问自己的主要问题是:“我的应用程序中的用户和套接字之间有区别吗?” 另一种询问方式是“每个登录用户一次可以有多个套接字连接吗?”

在网络世界中,单个用户可能总是有多个套接字连接,除非您专门放置了一些东西来防止这种情况发生。最简单的示例是,如果用户打开了同一页面的两个选项卡。在这些情况下,您不必只向人类用户发送一次消息/事件......您需要将其发送到该用户的每个套接字实例,以便每个选项卡都可以运行它的回调来更新 ui 状态。也许这对某些应用程序来说不是问题,但我的直觉说这对大多数应用程序来说都是如此。如果您对此感到担忧,请继续阅读......

要解决这个问题(假设您使用数据库作为持久存储),您需要 3 个表。

  1. 用户 - 与真人一对一
  2. 客户端 - 代表一个“选项卡”,可以与套接字服务器建立单个连接。(任何“用户”可能有多个)
  3. 消息 - 需要发送给客户端的消息(不是需要发送给用户或套接字的消息)

如果您的应用不需要,则 users 表是可选的,但 OP 说他们有。

需要正确定义的另一件事是“什么是套接字连接?”、“何时创建套接字连接?”、“何时重用套接字连接?”。Michelle 的伪代码使套接字连接看起来可以重用。使用 Socket.IO,它们不能被重用。我已经看到了很多混乱的根源。在现实生活中,米歇尔的例子确实有意义。但我不得不想象这些情况很少见。真正发生的是当套接字连接丢失时,该连接、ID 等将永远不会被重用。因此,任何专门为该套接字标记的消息永远不会传递给任何人,因为当最初连接的客户端重新连接时,他们会获得一个全新的连接和新的 ID。这意味着它'

因此,对于基于 Web 的示例,这里将是我推荐的一组步骤:

  • 当用户加载有可能创建套接字连接的客户端(通常是单个网页)时,将一行添加到链接到其用户 ID 的客户端数据库。
  • 当用户确实连接到套接字服务器时,将客户端 ID 与连接请求一起传递给服务器。
  • 服务器应验证用户是否被允许连接并且客户端表中的客户端行可用于连接并相应地允许/拒绝。
  • 使用 Socket.IO 生成的套接字 ID 更新客户端行。
  • 发送连接到客户端 ID 的消息表中的任何项目。初始连接不会有任何连接,但如果这是来自尝试重新连接的客户端,则可能会有一些连接。
  • 每当需要向该套接字发送消息时,请在消息表中添加一行,该行链接到您生成的客户端 ID(而不是套接字 ID)。
  • 尝试发出消息并通过确认收听客户端。
  • 收到确认后,从消息表中删除该项目。
  • 您可能希望在客户端创建一些逻辑来丢弃从服务器发送的重复消息,因为正如一些人指出的那样,这在技术上是一种可能性。
  • 然后当客户端与套接字服务器断开连接时(故意或通过错误),不要删除客户端行,最多清除套接字 ID。这是因为同一客户端可能会尝试重新连接。
  • 当客户端尝试重新连接时,发送与原始连接尝试相同的客户端 ID。服务器会将其视为初始连接。
  • 当客户端被销毁(用户关闭选项卡或导航离开)时,即删除客户端行和该客户端的所有消息。这一步可能有点棘手。

因为最后一步很棘手(至少以前是这样,我已经很长时间没有做过类似的事情了),并且因为在某些情况下,例如掉电,客户端会在不清理客户端行的情况下断开连接并且从不尝试重新连接同一个客户端行 - 您可能希望有一些定期运行的东西来清理任何过时的客户端和消息行。或者,您可以永久存储所有客户端和消息,并适当地标记它们的状态。

所以要明确一点,如果一个用户打开了两个选项卡,您将向消息表中添加两条相同的消息,每个消息都标记为不同的客户端,因为您的服务器需要知道每个客户端是否收到它们,而不仅仅是每个用户。

于 2019-05-29T18:34:37.720 回答
3

正如已经在另一个答案中所写的那样,我也相信您应该将实时视为奖励:系统也应该能够在没有实时的情况下工作。

我正在为一家大公司(ios、android、web 前端和 .net core + postGres 后端)开发企业聊天,并且在开发了一种让 websocket 重新建立连接(通过套接字 uuid)并获得未传递消息的方法之后(存储在队列中)我知道有一个更好的解决方案:通过 rest API 重新同步。

基本上我最终只使用了实时 websocket,在每条实时消息(用户在线、打字机、聊天消息等)上都有一个整数标签,用于监控丢失的消息。

当客户端获得一个不是整体的 id (+1) 时,它会理解它不同步,因此它会丢弃所有套接字消息并通过 REST api 要求其所有观察者重新同步。

这样,我们可以在离线期间处理应用程序状态的许多变化,而无需在重新连接时连续解析大量 websocket 消息,并且我们肯定会被同步(因为最后一个同步日期仅由 REST api 设置,而不是来自套接字)。

唯一棘手的部分是从调用 REST api 到服务器回复的那一刻监控实时消息,因为从数据库读取的内容需要时间才能返回到客户端,同时可能会发生变化,因此需要缓存它们并考虑到。

我们将在几个月后投入生产,我希望届时能重新入睡 :)

于 2019-11-14T22:19:29.523 回答
1

您似乎已经拥有用户帐户系统。您知道哪个帐户在线/离线,您可以处理连接/断开连接事件:

所以解决方案是,为每个用户在数据库中添加在线/离线和离线消息:

chatApp.onLogin(function (user) {
   user.readOfflineMessage(function (msgs) {
       user.sendOfflineMessage(msgs, function (err) {
           if (!err) user.clearOfflineMessage();
       });
   })
});

chatApp.onMessage(function (fromUser, toUser, msg) {
   if (user.isOnline()) {
      toUser.sendMessage(msg, function (err) {
          // alert CAN NOT SEND, RETRY?
      });
   } else {
      toUser.addToOfflineQueue(msg);
   }
})
于 2013-12-19T17:21:43.187 回答
0

看这里:处理浏览器重新加载 socket.io

我认为您可以使用我提出的解决方案。如果您正确修改它,它应该可以按您的意愿工作。

于 2013-12-27T13:01:47.843 回答
0

我认为您想要的是为每个用户提供一个可重用的套接字,例如:

客户:

socket.on("msg", function(){
    socket.send("msg-conf");
});

服务器:

// Add this socket property to all users, with your existing user system
user.socket = {
    messages:[],
    io:null
}
user.send = function(msg){ // Call this method to send a message
    if(this.socket.io){ // this.io will be set to null when dissconnected
        // Wait For Confirmation that message was sent.
        var hasconf = false;
        this.socket.io.on("msg-conf", function(data){
            // Expect the client to emit "msg-conf"
            hasconf = true;
        });
        // send the message
        this.socket.io.send("msg", msg); // if connected, call socket.io's send method
        setTimeout(function(){
            if(!hasconf){
                this.socket = null; // If the client did not respond, mark them as offline.
                this.socket.messages.push(msg); // Add it to the queue
            }
        }, 60 * 1000); // Make sure this is the same as your timeout.

    } else {
        this.socket.messages.push(msg); // Otherwise, it's offline. Add it to the message queue
    }
}
user.flush = function(){ // Call this when user comes back online
    for(var msg in this.socket.messages){ // For every message in the queue, send it.
        this.send(msg);
    }
}
// Make Sure this runs whenever the user gets logged in/comes online
user.onconnect = function(socket){
    this.socket.io = socket; // Set the socket.io socket
    this.flush(); // Send all messages that are waiting
}
// Make sure this is called when the user disconnects/logs out
user.disconnect = function(){
    self.socket.io = null; // Set the socket to null, so any messages are queued not send.
}

然后在断开连接之间保留套接字队列。

确保将每个用户socket属性保存到数据库中,并使方法成为用户原型的一部分。数据库无关紧要,只需保存它,但您一直在保存您的用户。

这将通过在将消息标记为已发送之前要求客户端确认来避免 Additon 1 中提到的问题。如果你真的想要,你可以给每条消息一个 id 并让客户端将消息 id 发送到msg-conf,然后检查它。

在此示例中,user是模板用户,所有用户都从该模板用户中复制,或类似于用户原型。

注意:这尚未经过测试。

于 2013-12-27T20:45:37.483 回答
-2

最近一直在看这些东西,并认为不同的路径可能会更好。

尝试查看 Azure 服务总线、问题和主题来处理离线状态。消息等待用户回来,然后他们收到消息。

运行队列是一项成本,但对于基本队列而言,每百万次操作需要 0.05 美元,因此开发成本将更多来自编写队列系统所需的工作时间。 https://azure.microsoft.com/en-us/pricing/details/service-bus/

azure bus 有 PHP、C#、Xarmin、Anjular、Java Script 等的库和示例。

所以服务器发送消息并且不需要担心跟踪它们。客户端也可以使用消息发回,因为如果需要,可以处理负载平衡。

于 2017-06-13T08:50:22.677 回答
-2

试试这个发出聊天列表

io.on('connect', onConnect);

function onConnect(socket){

  // sending to the client
  socket.emit('hello', 'can you hear me?', 1, 2, 'abc');

  // sending to all clients except sender
  socket.broadcast.emit('broadcast', 'hello friends!');

  // sending to all clients in 'game' room except sender
  socket.to('game').emit('nice game', "let's play a game");

  // sending to all clients in 'game1' and/or in 'game2' room, except sender
  socket.to('game1').to('game2').emit('nice game', "let's play a game (too)");

  // sending to all clients in 'game' room, including sender
  io.in('game').emit('big-announcement', 'the game will start soon');

  // sending to all clients in namespace 'myNamespace', including sender
  io.of('myNamespace').emit('bigger-announcement', 'the tournament will start soon');

  // sending to individual socketid (private message)
  socket.to(<socketid>).emit('hey', 'I just met you');

  // sending with acknowledgement
  socket.emit('question', 'do you think so?', function (answer) {});

  // sending without compression
  socket.compress(false).emit('uncompressed', "that's rough");

  // sending a message that might be dropped if the client is not ready to receive messages
  socket.volatile.emit('maybe', 'do you really need it?');

  // sending to all clients on this node (when using multiple nodes)
  io.local.emit('hi', 'my lovely babies');

};

于 2019-02-25T14:38:30.150 回答