我们正在使用 celery 从不同的旅行社获取航班数据,每个请求大约需要 20-30 秒(大多数旅行社需要请求序列 - 授权、发送请求、轮询结果)。
正常的 celery 任务如下所示:
from eventlet.green import urllib2, time
def get_results(attr, **kwargs):
search, provider, minprice = attr
data = XXX # prepared data
host = urljoin(MAIN_URL, "RPCService/Flights_SearchStart")
req = urllib2.Request(host, data, {'Content-Type': 'text/xml'})
try:
response_stream = urllib2.urlopen(req)
except urllib2.URLError as e:
return [search, None]
response = response_stream.read()
rsp_host = urljoin(MAIN_URL, "RPCService/FlightSearchResults_Get")
rsp_req = urllib2.Request(rsp_host, response, {'Content-Type':
'text/xml'})
ready = False
sleeptime = 1
rsp_response = ''
while not ready:
time.sleep(sleeptime)
try:
rsp_response_stream = urllib2.urlopen(rsp_req)
except urllib2.URLError as e:
log.error('go2see: results fetch failed for %s IOError %s'%
(search.id, str(e)))
else:
rsp_response = rsp_response_stream.read()
try:
rsp = parseString(rsp_response)
except ExpatError as e:
return [search, None]
else:
ready = rsp.getElementsByTagName('SearchResultEx')
[0].getElementsByTagName('IsReady')[0].firstChild.data
ready = (ready == 'true')
sleeptime += 1
if sleeptime > 10:
return [search, None]
hash = "%032x" % random.getrandbits(128)
open(RESULT_TMP_FOLDER+hash, 'w+').write(rsp_response)
# call to parser
parse_agent_results.apply_async(queue='parsers', args=[__name__,
search, provider, hash])
此任务在并发 300 的 eventlet 池中运行
prefetch_multiplier = 1
,broker_limit = 300
当从队列中获取约 100-200 个任务时 - CPU 使用率上升到 100%(使用整个 CPU 核心)并且从队列中获取任务时执行延迟。
您能否指出可能的问题 - 阻塞操作(eventletALARM DETECTOR
没有例外),错误的架构或其他任何问题。