我有兴趣从我的数据库中提取一些 url,以便在下次重新抓取时首先抓取。
我为此编写了一个自定义中间件,但由于 JOBDIR 的一个错误,我无法使用自定义中间件实现我想要的(查看这里的线程)
所以我决定编写自己的自定义调度程序,因为这是 scrapy 获取所有请求的地方。
这是我的自定义队列:
class GhostQ(object):
"""
A Queue of Requests for ghost objects from our last crawl.
-Can only be popped.
-Basically a generator with length property
-Does not include current ghost objects in the scheduler
A "ghost" object is defined to be an object persisted into our db from crawling that has
not yet been visited. They exist because we need to persist relationships and the only way
we can do so is by persisting a blank object first.
Entities that we have to check:
-Looks, check if image_url is null
-Clothing, check if name, brand, type is null
-Model, check if name is null
"""
def __init__(self, glooks_cursor, gclothing_cursor, gmodels_cursor, priority=5, yield_per=100):
self._length = None
self._generator = None
self.PRIORITY = priority
self.YIELD_PER = yield_per
self.glooks_cursor = glooks_cursor
self.gclothing_cursor = gclothing_cursor
self.gmodels_cursor = gmodels_cursor
def _init_length(self):
total = self.glooks_cursor.count() + self.gmodels_cursor.count() \
+ self.gclothing_cursor.count()
self._length = total
log.msg("GhostQ has %d objects" % self._length)
def __len__(self):
if self._length is None:
self._init_length()
return self._length
def _init_generator(self):
"""The use of all here allows us to retrieve everything at once.
TODO (Somehow this is not breaking??!?)
-yield_per should be breaking because we are also committing
-Perhaps we need our our session here?
i.e new_session = scoped_session(sessionmaker(autocommit=False, autoflush=True, bind=engine))
"""
for look in self.glooks_cursor.yield_per(self.YIELD_PER):
yield Request(look.url, priority=self.PRIORITY)
for clothing in self.gclothing_cursor.yield_per(self.YIELD_PER):
yield Request(clothing.look.url, priority=self.PRIORITY)
for model in self.gmodels_cursor.yield_per(self.YIELD_PER):
yield Request(model.url, priority=self.PRIORITY)
def pop(self):
if self._generator is None:
self._generator = self._init_generator()
try:
request = self._generator.next()
if self._length is None:
self._init_length()
self._length -= 1
return request
except StopIteration:
return None
这是我的自定义调度程序:
class GiordanoScheduler(Scheduler):
"""Custom scheduler with an additional queue for ghost objects.
Queries the database for "ghosts" objects and schedules Scrapy to crawl these first.
"""
def open(self, spider):
self.ghostq = GhostQ(
session.query(Look).filter(Look.image_url == None),
session.query(Clothing).filter(and_(
Clothing.name == None, Clothing.brand == None, Clothing.type == None)),
session.query(Model).filter(Model.name == None))
return Scheduler.open(self, spider)
def __len__(self):
if self.dqs:
return len(self.ghostq) + len(self.dqs) + len(self.mqs)
else:
return len(self.ghostq) + len(self.mqs)
def next_request(self):
request = self.ghostq.pop()
if request:
log.msg("Popping from ghostq: " + request.url, level=log.DEBUG)
self.stats.inc_value('scheduler/dequeued/ghost', spider=self.spider)
else:
request = self.mqs.pop()
if request:
log.msg("Popping from mqs: " + request.url, level=log.DEBUG)
self.stats.inc_value('scheduler/dequeued/memory', spider=self.spider)
else:
request = self._dqpop()
log.msg("Popping from dq: " + request.url, level=log.DEBUG)
if request:
self.stats.inc_value('scheduler/dequeued/disk', spider=self.spider)
if request:
self.stats.inc_value('scheduler/dequeued', spider=self.spider)
return request
当我运行这个调度程序时,我没有看到任何被刮掉的东西......
这是日志
但是,如果我禁用这条线:request = self.ghostq.pop()
,我会看到东西再次被刮掉。
这是一个非常奇怪的错误。我似乎无法弄清楚为什么。我最初怀疑 dupefilter 是罪魁祸首,但我意识到 dupefilter 只会过滤请求而不是对象。
如果我禁用该行,这就是它的样子:log