7

我想对 REST webservices 进行很多 url requets。通常在 75-90k 之间。但是,我需要限制与 Web 服务的并发连接数。

我开始以以下方式玩 grequests,但很快就开始咀嚼打开的套接字。

concurrent_limit = 30
urllist = buildUrls()
hdrs = {'Host' : 'hostserver'}
g_requests = (grequests.get(url, headers=hdrs) for url in urls)
g_responses = grequests.map(g_requests, size=concurrent_limit)

当它运行一分钟左右时,我遇到了“达到的最大套接字数”错误。据我所知,grequests 中的每个 requests.get 调用都使用它自己的会话,这意味着为每个请求打开一个新套接字。

我在github上找到了一条注释,指的是如何使 grequests 使用单个会话。但这似乎有效地将所有请求限制在一个共享池中。这似乎违背了异步 http 请求的目的。

s = requests.session()
rs = [grequests.get(url, session=s) for url in urls]
grequests.map(rs)

是否可以以创建多个会话的方式使用 grequests 或 gevent.Pool?

换句话说:如何使用队列或连接池来发出许多并发 http 请求?

4

3 回答 3

7

我最终没有使用 grequests 来解决我的问题。我仍然希望这可能是可能的。

我使用了线程:

class MyAwesomeThread(Thread):
    """
    Threading wrapper to handle counting and processing of tasks
    """
    def __init__(self, session, q):
        self.q = q
        self.count = 0
        self.session = session
        self.response = None
        Thread.__init__(self)

    def run(self): 
        """TASK RUN BY THREADING"""
        while True:
            url, host = self.q.get()
            httpHeaders = {'Host' : host}
            self.response = session.get(url, headers=httpHeaders)
            # handle response here
            self.count+= 1
            self.q.task_done()
        return

q=Queue()
threads = []
for i in range(CONCURRENT):
    session = requests.session()
    t=MyAwesomeThread(session,q)
    t.daemon=True # allows us to send an interrupt 
    threads.append(t)


## build urls and add them to the Queue
for url in buildurls():
    q.put_nowait((url,host))

## start the threads
for t in threads:
    t.start()
于 2013-12-12T21:43:23.757 回答
2

rs 是一个 AsyncRequest 列表。每个 AsyncRequest 都有自己的会话。

rs = [grequests.get(url) for url in urls]
grequests.map(rs)
for ar in rs:
    print(ar.session.cookies)
于 2013-11-28T06:34:22.530 回答
2

像这样的东西:

NUM_SESSIONS = 50
sessions = [requests.Session() for i in range(NUM_SESSIONS)]
reqs = []
i = 0
for url in urls:
    reqs.append(grequests.get(url, session=sessions[i % NUM_SESSIONS]
    i+=1
responses = grequests.map(reqs, size=NUM_SESSIONS*5)

这应该将请求分散到 50 个不同的会话中。

于 2015-09-29T09:04:49.827 回答