0

我已经使用 Kafkajs 创建了一个 Apache Kafka 客户端,并且我正在尝试从 Kafka 中的某个主题中读取消息。如果我 console.log(message) 工作正常。但是我想每当在主题中产生/写入新消息时向客户端发送消息,消费者正在收听来自生产者的消息,同时保持连接处于活动状态。

 // function, which is being called whenever it's specified route is being requested
 async readMessage(req, res, next, consumer) {
    const resMessage = {};

    res.writeHead(200, {'Content-Type': 'text/plain'});

    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {  
            res.write(message.value.toString());
        },
    });

    // res.send(resMessage);
}

但是在我将数据发送到 express.js 服务器后, res.write() 不会将数据发送到客户端(我使用 Postman 作为我的 Node.js 客户端)。如何在调用 res.end() 之前刷新 res.write() 中写入的数据?

4

1 回答 1

1

res.write()确实会在您调用它时立即发送数据(Express 或 nodejs 没有缓冲)。我已经在调试器中对其进行了跟踪,并看到它毫不拖延地发送数据。它可能会被服务于 Nagle 算法的操作系统短暂缓冲,但这只是一个非常短的延迟(毫秒)。因此,数据正在发送到客户端。您的问题很可能出在接收数据的 http 客户端中。

大多数 http 客户端都是为了期望请求/响应而构建的。发送请求,等待整个响应,然后通知调用者。因此,如果您想在 http 流到达时获取常规数据,您将需要一个非传统的 http 客户端,它会在数据到达时通知您。您可能还需要发明某种协议,允许客户端知道完整的片段何时到达,因为数据包可以以随机方式分块或分组。

可能比使用 http 更容易的是使用一个实际的基于消息的协议,例如 webSockets 或 socket.io,它是为你正在尝试做的事情而明确设计的。客户端将与服务器建立连接,然后服务器可以随时向客户端发送消息。webSocket 或 socket.io 是基于消息的,因此它们已经为您完成了描述消息、将其打包、交付、解包并通知收件人消息已到达的所有工作。这正是 webSocket/socket.io 的设计目的。

对于单向消息发送(从服务器到客户端),您还可以使用服务器发送事件,它是 http 的扩展。

于 2020-02-24T22:02:55.007 回答