6

我目前正在使用非阻塞 SocketChannel (Java 1.6) 作为 Redis 服务器的客户端。Redis 直接通过套接字接受纯文本命令,由 CRLF 终止并响应类似,一个简单的示例:

发送:'PING\r\n'

RECV: '+乒乓球\r\n'

Redis 还可以返回大量回复(取决于您要求的内容),其中包含许多以 \r\n 结尾的数据部分,所有这些都作为单个响应的一部分。

我正在使用标准的while(socket.read() > 0) {//append bytes}循环从套接字读取字节并将它们重新组装成客户端的回复。

注意:我没有使用选择器,只是连接到服务器的多个客户端 SocketChannel,等待服务发送/接收命令。

我感到困惑的是 SocketChannel.read() 方法在非阻塞模式下的合同具体来说,如何知道服务器何时完成发送并且我有整个消息

我有一些方法可以防止返回过快并让服务器有机会回复,但我坚持的一件事是:

  1. read()是否有可能返回字节,然后在后续调用中不返回字节,但在另一个后续调用中再次返回一些字节?

基本上,如果我收到至少 1 个字节并最终read()返回 0,我是否可以相信服务器已完成对我的响应,然后我知道我已经完成了,或者服务器可能只是忙并且可能会喷出一些如果我等待并继续尝试更多字节?

如果即使在 read() 返回 0 字节后(在之前的成功读取之后)它仍然可以继续发送字节,那么我不知道如何判断服务器何时完成与我的对话,实际上我很困惑 java.io.*样式通信甚至会知道服务器何时“完成”。

正如你们所知,除非连接已死,否则 read 永远不会返回 -1 并且这些是标准的长期数据库连接,所以我不会在每个请求时关闭和打开它们。

我知道一个流行的回答(至少对于这些 NIO 问题)是看看 Grizzly、MINA 或 Netty——如果可能的话,我真的很想在采用一些 3rd 方依赖项之前了解这一切是如何在原始状态下工作的。

谢谢你。

奖金问题:

我最初认为阻塞 SocketChannel 将是解决此问题的方法,因为在我处理他们的命令并给他们回复之前,我真的不希望调用者做任何事情。

如果这最终是一个更好的方法,我有点困惑看到 SocketChannel.read() 阻塞,只要没有足够的字节来填充给定的缓冲区......没有逐字节读取所有内容我无法弄清楚这个默认行为实际上是如何使用的......我永远不知道从服务器返回的回复的确切大小,所以我对 SocketChannel.read() 的调用总是阻塞直到超时(此时我终于看到内容位于缓冲区中)。

我不清楚使用阻塞方法的正确方法,因为它总是挂断读取。

4

4 回答 4

4

请查看您的 Redis 规范以获取此答案。

.read()在一次调用中返回 0 个字节,在后续调用中返回 1 个或更多字节并不违反调用规则。这是完全合法的。如果有任何事情导致交付延迟,无论是由于网络延迟还是 Redis 服务器速度慢,都可能发生这种情况。

您寻求的答案与以下问题的答案相同:“如果我手动连接到 Redis 服务器并发送命令,我怎么知道它何时完成向我发送响应以便我可以发送另一个命令?”

答案必须在 Redis 规范中找到。如果服务器在执行完您的命令后没有发送全局令牌,那么这可以在逐个命令的基础上实现。如果 Redis 规范不允许这样做,那么这是 Redis 规范中的错误。他们应该告诉您如何判断他们何时发送了所有数据。这就是 shell 有命令提示符的原因。Redis 应该有一个等价物。

如果 Redis 在他们的规范中没有这个,那么我建议加入某种计时器功能。对处理套接字的线程进行编码,以在指定时间段(如 5 秒)内未收到任何数据后发出命令已完成的信号。选择比在服务器上执行最长命令所需的时间要长得多的时间段。

于 2011-02-07T21:06:48.377 回答
3

如果即使在 read() 返回 0 字节后(在之前的成功读取之后)它仍然可以继续发送字节,那么我不知道如何判断服务器何时完成与我的对话,实际上我很困惑 java.io.*样式通信甚至会知道服务器何时“完成”。

阅读并遵守协议:

http://redis.io/topics/protocol

该规范描述了可能的回复类型以及如何识别它们。有些是行终止的,而多行响应包括前缀计数。

回复

Redis 会以不同类型的回复来回复命令。可以从服务器发送的第一个字节检查回复的类型:

  • 对于单行回复,回复的第一个字节将是“+”
  • 带有错误消息的回复的第一个字节将是“-”
  • 使用整数时,回复的第一个字节将是“:”
  • 对于批量回复,回复的第一个字节将是“$”
  • 对于多批量回复,回复的第一个字节将是“*”

单行回复

单行回复采用单行字符串的形式,以“+”开头,以“\r\n”结尾。...

...

多批回复

LRANGE 之类的命令需要返回多个值(列表的每个元素都是一个值,而 LRANGE 需要返回多个元素)。这是使用多个批量写入来完成的,前面有一个初始行,指示接下来将进行多少批量写入


read() 是否有可能返回字节,然后在后续调用中不返回任何字节,但在另一个后续调用中再次返回一些字节?基本上,如果我收到至少 1 个字节并且最终 read() 返回 0,那么我是否可以相信服务器已经完成响应,然后我知道我已经完成了,或者服务器可能只是忙并且可能会喷出一些如果我等待并继续尝试更多字节?

是的,这是可能的。这不仅仅是因为服务器很忙,而且网络拥塞和路由中断会导致数据“暂停”。数据是一个流,可以在流中的任何位置“暂停”,而与应用程序协议无关。

继续将流读入缓冲区。查看第一个字符以确定预期的响应类型。每次成功读取后检查缓冲区,直到缓冲区包含符合规范的完整消息。


我最初认为阻塞 SocketChannel 将是解决此问题的方法,因为在我处理他们的命令并给他们回复之前,我真的不希望调用者做任何事情。

我想你是正确的。根据我对规范的快速浏览,阻塞读取不适用于此协议。由于它看起来是基于行的,因此BufferedReader可能会有所帮助,但您仍然需要知道如何识别响应何时完成。

于 2011-02-07T21:30:25.837 回答
2

我正在使用标准的 while(socket.read() > 0) {//append bytes} 循环

这不是 NIO 的标准技术。您必须将读取结果存储在变量中,并对其进行测试:

  1. -1,表示EOS,表示你应该关闭通道
  2. 零,表示没有数据可读取,这意味着您应该返回 select() 循环,并且
  3. 一个正值,意味着您已经读取了那么多字节,然后您应该在继续之前从 ByteBuffer (get()/compact()) 中提取和删除这些字节。
于 2011-04-15T06:42:46.437 回答
2

已经很久了,但是。. .

我目前正在使用非阻塞 SocketChannel

需要明确的是,SocketChannels 默认是阻塞的;为了使它们非阻塞,必须显式调用SocketChannel#configureBlocking(false)

我会假设你这样做了

我没有使用选择器

哇;那就是问题所在; 如果您要使用非阻塞通道,那么您应该始终使用选择器(至少对于读取);否则,您会遇到您所描述的混乱,即。read(ByteBuffer) == 0没有任何意义(嗯,这意味着此时 tcp 接收缓冲区中没有字节

这类似于检查您的邮箱并且它是空的;这是否意味着这封信永远不会到达?从未发送过?

我感到困惑的是 SocketChannel.read() 方法在非阻塞模式下的合同,具体来说,如何知道服务器何时完成发送并且我有整个消息。

有一个契约 -> 如果一个 Selector 选择了一个 Channel 进行读取操作,那么下一次调用SocketChannel#read(ByteBuffer)保证返回 > 0(假设 ByteBuffer arg 中有空间)

这就是您使用选择器的原因,因为它可以在一次选择中调用“选择”1Ks 的 SocketChannels,这些 SocketChannels 有准备读取的字节

现在在默认阻塞模式下使用 SocketChannels 没有任何问题;并根据您的描述(一两个客户),可能没有理由更简单;但如果您想使用非阻塞通道,请使用选择器

于 2015-09-07T00:16:54.183 回答