我使用多处理模型开发了一个爬虫。
它使用 multiprocessing.Queue 来存储需要爬取的 url-infos ,需要解析的页面内容等等;使用 multiprocessing.Event 来控制子进程;使用 multiprocessing.Manager.dict 来存储爬取的 url 的哈希值;每个多处理。 Manager.dict 实例使用 multiprocessing.Lock 来控制访问。
这三种类型的参数都在所有子进程和父进程之间共享,并且所有参数都组织在一个类中,我使用类的实例将共享参数从父进程传递到子进程。就像:
MGR = SyncManager()
class Global_Params():
Queue_URL = multiprocessing.Queue()
URL_RESULY = MGR.dict()
URL_RESULY_Mutex = multiprocessing.Lock()
STOP_EVENT = multiprocessing.Event()
global_params = Global_Params()
在我自己的超时机制中,我使用 process.terminate 来停止长时间无法自行停止的进程!
在我的测试用例中,有 2500 多个目标站点(有些没有服务,有些很大)。逐个站点抓取目标站点文件中的站点。
刚开始爬虫可以正常工作,但经过很长时间(有时是 8 小时,有时是 2 小时,有时是 15 小时),爬虫爬取了超过 100 个(不确定)站点,我会得到错误信息:"Errno 32 断管"
我尝试了以下方法来定位和解决问题:
定位爬虫中断的站点A,然后使用爬虫单独爬取该站点,爬虫运行良好。即使我从包含站点 A 的所有目标站点文件中获取了一个片段(例如 20 个站点),爬虫也运行良好!
将“-X /tmp/pymp-* 240 /tmp”添加到 /etc/cron.daily/tmpwatch
当 Broken 发生时,文件 /tmp/pymp-* 仍然存在
使用 multiprocessing.managers.SyncManager 替换 multiprocessing.Manager 并忽略除 SIGKILL 和 SIGTERM 之外的大多数信号
对于每个目标站点,我清除了大多数共享参数(队列、字典和事件),如果发生错误,则创建一个新实例:
while global_params.Queue_url.qsize()>0:
try:
global_params.Queue_url.get(block=False)
except Exception,e:
print_info(str(e))
print_info("Clear Queue_url error!")
time.sleep(1)
global_params.Queue_url = Queue()
pass
下面是 Traceback 信息,print_info 函数是自己定义的打印和存储调试信息:
[Errno 32] Broken pipe
Traceback (most recent call last):
File "Spider.py", line 613, in <module>
main(args)
File "Spider.py", line 565, in main
spider.start()
File "Spider.py", line 367, in start
print_info("STATIC_RESULT size:%d" % len(global_params.STATIC_RESULT))
File "<string>", line 2, in __len__
File "/usr/local/python2.7.3/lib/python2.7/multiprocessing/managers.py", line 769, in _callmethod
kind, result = conn.recv()
EOFError
不明白为什么,有人知道原因吗?