你会使用什么工具或工具集来水平扩展scrapyd,动态地将新机器添加到scrapyd集群中,如果需要,每台机器有N个实例。并非所有实例都必须共享一个共同的作业队列,但这太棒了。
Scrapy-cluster似乎很有前途,但我想要一个基于 Scrapyd 的解决方案,所以我听取其他替代方案和建议。
你会使用什么工具或工具集来水平扩展scrapyd,动态地将新机器添加到scrapyd集群中,如果需要,每台机器有N个实例。并非所有实例都必须共享一个共同的作业队列,但这太棒了。
Scrapy-cluster似乎很有前途,但我想要一个基于 Scrapyd 的解决方案,所以我听取其他替代方案和建议。
我使用 Scrapyd 的 API 和wrapper编写了自己的负载均衡器脚本。
from random import shuffle
from scrapyd_api.wrapper import ScrapydAPI
class JobLoadBalancer(object):
@classmethod
def get_less_occupied(
cls,
servers_urls=settings.SERVERS_URLS,
project=settings.DEFAULT_PROJECT,
acceptable=settings.ACCEPTABLE_PENDING):
free_runner = {'num_jobs': 9999, 'client': None}
# shuffle servers optimization
shuffle(servers_urls)
for url in servers_urls:
scrapyd = ScrapydAPI(target=url)
jobs = scrapyd.list_jobs(project)
num_jobs = len(jobs['pending'])
if free_runner['num_jobs'] > num_jobs:
free_runner['num_jobs'] = num_jobs
free_runner['client'] = scrapyd
# Optimization: if found acceptable pending operations in one server stop looking for another one
if free_runner['client'] and free_runner['num_jobs'] <= acceptable:
break
return free_runner['client']
单元测试:
def setUp(self):
super(TestFactory, self).setUp()
# Make sure this servers are running
settings.SERVERS_URLS = [
'http://localhost:6800',
'http://localhost:6900'
]
self.project = 'dummy'
self.spider = 'dummy_spider'
self.acceptable = 0
def test_get_less_occupied(self):
# add new dummy jobs to first server so that choose the second one
scrapyd = ScrapydAPI(target=settings.SERVERS_URLS[0])
scrapyd.schedule(project=self.project, spider=self.spider)
scrapyd.schedule(project=self.project, spider=self.spider)
second_server_url = settings.SERVERS_URLS[1]
scrapyd = JobLoadBalancer.get_less_occupied(
servers_urls=settings.SERVERS_URLS,
project=self.project,
acceptable=self.acceptable)
self.assertEqual(scrapyd.target, second_server_url)
该代码针对的是一年多前编写的旧版本的scrapyd。