13

我在这里有一个非常简单的问题。我需要同时与很多主机通信,但我并不需要任何同步,因为每个请求都是自给自足的。

因此,我选择使用异步套接字,而不是垃圾邮件线程。现在我确实有一个小问题:

异步的东西就像一个魅力,但是当我连接到 100 个主机时,我得到 100 个超时(超时 = 10 秒)然后我等待 1000 秒,只是为了找出我所有的连接都失败了。

有什么办法也可以获得非阻塞套接字连接?我的套接字已设置为非阻塞,但对 connect() 的调用仍处于阻塞状态。

减少超时不是一个可接受的解决方案。

我在 Python 中执行此操作,但我想在这种情况下编程语言并不重要。

我真的需要使用线程吗?

4

6 回答 6

8

使用select模块。这允许您在多个非阻塞套接字上等待 I/O 完成。这是有关选择的更多信息。从链接到页面:

在 C 中,编码select相当复杂。在 Python 中,它是小菜一碟,但它与 C 版本足够接近,如果您了解 Python 中的 select,那么在 C 中使用它不会有什么问题。

ready_to_read, ready_to_write, in_error = select.select(
                  potential_readers, 
                  potential_writers, 
                  potential_errs, 
                  timeout)

您传递select了三个列表:第一个包含您可能想尝试读取的所有套接字;第二个是您可能要尝试写入的所有套接字,最后一个(通常留空)您要检查错误的那些。您应该注意,一个套接字可以进入多个列表。呼叫被阻塞,select但您可以给它一个超时。这通常是一件明智的事情 - 给它一个很好的长时间超时(比如一分钟),除非你有充分的理由不这样做。

作为回报,您将获得三个列表。它们具有实际上可读、可写和错误的套接字。这些列表中的每一个都是您传入的相应列表的子集(可能为空)。如果您将一个套接字放入多个输入列表中,则它只会(最多)在一个输出列表中。

如果一个套接字在输出可读列表中,您可以确定recv该套接字上的 a 将返回某些内容。可写列表的想法相同。你会send 有所作为。也许不是你想要的,但有总比没有好。(实际上,任何相当健康的套接字都将返回为可写的——这只是意味着出站网络缓冲区空间可用。)

如果您有“服务器”套接字,请将其放入 potential_readers 列表中。如果它出现在可读列表中,那么您的接受(几乎可以肯定)会起作用。如果您创建了一个新套接字来连接到其他人,请将其放入 potential_writers 列表中。如果它出现在可写列表中,则您很有可能已连接。

于 2009-07-30T11:21:52.717 回答
7

不幸的是,没有显示错误的示例代码,所以很难看出这个块来自哪里。

他做了类似的事情:

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setblocking(0)
s.connect(("www.nonexistingname.org", 80))

socket 模块内部使用 getaddrinfo ,这是一个阻塞操作,尤其是当主机名不存在时。一个符合标准的 dns 客户端将等待一段时间来查看该名称是否真的不存在,或者是否只涉及一些慢速 dns 服务器。

解决方案是仅连接到 ip-addresses 或使用允许非阻塞请求的 dns 客户端,例如pydns

于 2009-07-31T10:40:47.880 回答
5

您还需要并行化连接,因为设置超时时套接字会阻塞。或者,您不能设置超时,并使用选择模块。

您可以使用异步模块中的调度程序类来执行此操作。看一下基本的http 客户端示例。该类的多个实例在连接时不会相互阻塞。您可以像使用线程一样轻松地做到这一点,我认为跟踪套接字超时更容易,但由于您已经在使用异步方法,您最好保持在同一轨道上。

例如,以下适用于我所有的 linux 系统

import asyncore, socket

class client(asyncore.dispatcher):
    def __init__(self, host):
        self.host = host
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connect((host, 22))

    def handle_connect(self):
        print 'Connected to', self.host

    def handle_close(self):
        self.close()

    def handle_write(self):
        self.send('')

    def handle_read(self):
        print ' ', self.recv(1024)

clients = []
for i in range(50, 100):
    clients.append(client('cluster%d' % i))

asyncore.loop()

在 cluster50 - cluster100 中,有许多机器无响应或不存在。这立即开始打印:

Connected to cluster50
  SSH-2.0-OpenSSH_4.3

Connected to cluster51
  SSH-2.0-OpenSSH_4.3

Connected to cluster52
  SSH-2.0-OpenSSH_4.3

Connected to cluster60
  SSH-2.0-OpenSSH_4.3

Connected to cluster61
  SSH-2.0-OpenSSH_4.3

...

然而,这并没有考虑到 getaddrinfo,它必须阻塞。如果您在解决 dns 查询时遇到问题,一切都必须等待。您可能需要自己单独收集 dns 查询,并在异步循环中使用 ip 地址

如果您想要比 asyncore 更大的工具包,请查看Twisted Matrix。入门有点繁重,但它是您可以获得的用于 python 的最佳网络编程工具包。

于 2009-07-30T13:47:23.793 回答
4

使用扭曲的 .

它是一个用 Python 编写的异步网络引擎,支持多种协议,您可以添加自己的协议。它可用于开发客户端和服务器。它不会阻止连接。

于 2009-07-30T13:49:15.523 回答
1

socket.connect与非阻塞套接字一起使用BlockingIOError时,最初会得到一个。请参阅TCP 连接错误 115 Operation in Progress 原因是什么?原因的解释。

解决方案是捕获并忽略异常或使用socket.connect_ex,而不是socket.connect因为该方法不会引发异常。特别注意 Python 文档中描述的最后一句话:

socket.connect_ex(address)

类似connect(address),但返回一个错误指示符,而不是为 C 级 connect() 调用返回的错误引发异常(其他问题,例如“找不到主机”,仍然可以引发异常)。如果操作成功,则错误指示符为 0,否则为 errno 变量的值。这对于支持例如异步连接很有用。

来源:https ://docs.python.org/3/library/socket.html#socket.socket.connect_ex

如果你想继续使用socket.connect,你可以捕捉并忽略负责任的EINPROGRESS错误:

>>> import socket
>>> 
>>> # bad
>>> s = socket.socket()
>>> s.setblocking(False)
>>> s.connect(("127.0.0.1", 8080))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
BlockingIOError: [Errno 115] Operation now in progress
>>> 
>>> # good
>>> s = socket.socket()
>>> s.setblocking(False)
>>> try:
...     s.connect(("127.0.0.1", 8080))
... except OSError as exc:
...     if exc.errno != 115:  # EINPROGRESS
...         raise
... 
>>> 
于 2021-10-14T21:24:20.103 回答
0

你看过异步模块吗?可能正是你需要的。

于 2009-07-30T11:15:18.767 回答