5

我目前正在使用此库对我设置的 kafka 服务器进行压力测试:https ://github.com/dsully/pykafka

import kafka
import time

def test_kafka_server(n=1):
    for i in range(0,n):
        producer = kafka.producer.Producer('test',host='10.137.8.192')
        message = kafka.message.Message(str(time.time()))
        producer.send(message)
        producer.disconnect()

def main():
    test_kafka_server(100000)

if __name__ == '__main__':
    main()

最终发生的事情是我最终使我自己的本地机器超载。

我收到错误 10055,根据谷歌的说法,这意味着“Windows 已用完 TCP/IP 套接字缓冲区,因为同时打开了太多连接。” 根据 netstat, producer.disconnect() 不是关闭套接字,而是将其置于TIME_WAIT状态。

ipython 调试器指向这一行:

C:\Python27\lib\socket.pyc in meth(name, self, *args)
    222     proto = property(lambda self: self._sock.proto, doc="the socket protocol")
    223 
--> 224 def meth(name,self,*args):
    225     return getattr(self._sock,name)(*args)
    226 

作为罪魁祸首,但这似乎会使事情变得比我感到舒服的要低。

我已经搜索并发现这个Python 套接字没有正确关闭连接,建议这样做:

setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

所以,我在 io.py 文件中使用该选项重建了 pykafka 库:

  def connect(self):
    """ Connect to the Kafka server. """
    global socket
    self.socket = socket.socket()
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self.socket.connect((self.host, self.port))

我仍然得到同样的错误。

我没有把 setsockopt 线放在正确的位置吗?还有什么我可以尝试的吗?

4

1 回答 1

6

您所描述的是套接字级别的正常 TCP 行为。当用户级程序关闭套接字时,内核不会立即释放套接字。它进入 TIME_WAIT 状态:

TIME-WAIT(服务器或客户端)表示等待足够的时间以确保远程 TCP 收到其连接终止请求的确认。[根据 RFC 793,连接可以在 TIME-WAIT 中停留最多四分钟,称为 MSL(最大分段寿命)。

所以套接字是关闭的。socket.SO_REUSEADDR 用于侦听器(服务器),不影响客户端连接。嗯,确实在绑定socket的时候用到了。

于 2012-11-08T12:22:51.223 回答