我通过使用芹菜解决了这个问题。Celery 本质上会产生异步工作者,这正是你所需要的。任务还可以采用“倒计时”参数,以便您可以安排请求。
任务相当简单:
import requests
from celery.task import task
@task(max_retries=0, ignore_result=True)
def get_url(url, user_agent):
headers = {"User-Agent": user_agent}
try:
r = requests.get(url, headers=headers)
except requests.ConnectionError:
print "Couldn't fetch %s" % url
我使用 Django 的管理命令系统来解析日志,因为我不想费心学习如何在普通的 Python 环境中与 Celery 对话。您的日志格式可能不同,因此请根据需要进行调整。部分内容management/commands/my_command.py
(超出质量-我很着急):
import sys
import csv
import dateutil
from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone
class Command(BaseCommand):
def handle(self, *args, **options):
now = timezone.now()
URL_PREFIX = "http://my.site.com"
pth = sys.argv[-1]
fp = open(pth, "r")
fieldnames=[
"ip",
"dc1",
"cache_result",
"datetime_a",
"datetime_b",
"request_time",
"upstream_time",
"dc2",
"path_raw",
"status_code",
"size_bytes",
"header_url",
"user_agent"
]
reader = csv.DictReader(fp, fieldnames=fieldnames, delimiter=" ", quotechar='"')
reader = list(reader)[-1000:]
fp.close()
processed = []
first_diff = None
for row in reader:
method, url, b = row["path_raw"].split()
if method.lower() not in ("get", "head"):
continue
sent_raw = "%s %s:%s:%s" % tuple(row["datetime_a"].lstrip("[").split(":"))\
+ " " + row["datetime_b"].rstrip("]")
sent = dateutil.parser.parse(sent_raw)
if first_diff is None:
first_diff = (now - sent).seconds + 1
get_url.apply_async(
(URL_PREFIX + url, row["user_agent"]),
countdown=first_diff - (now - sent).seconds
)
开始芹菜manage.py celery worker -B --loglevel=info
。
使用 运行管理命令manage.py my_command /path/to/file.log
。