6

我正在开发一个使用 Tor 的刮板,它的简化版本在这个示例项目中:https ://github.com/khpeek/scraper-compose 。该项目具有以下(简化的)结构:

.
├── docker-compose.yml
├── privoxy
│   ├── config
│   └── Dockerfile
├── scraper
│   ├── Dockerfile
│   ├── requirements.txt
│   ├── tutorial
│   │   ├── scrapy.cfg
│   │   └── tutorial
│   │       ├── extensions.py
│   │       ├── __init__.py
│   │       ├── items.py
│   │       ├── middlewares.py
│   │       ├── pipelines.py
│   │       ├── settings.py
│   │       ├── spiders
│   │       │   ├── __init__.py
│   │       │   └── quotes_spider.py
│   │       └── tor_controller.py
│   └── wait-for
│       └── wait-for
└── tor
    ├── Dockerfile
    └── torrc

蜘蛛,定义quotes_spider.py,是一个基于Scrapy 教程的非常简单的蜘蛛:

import scrapy
from tutorial.items import QuoteItem

class QuotesSpider(scrapy.Spider):
    name = "quotes"
    start_urls = ['http://quotes.toscrape.com/page/{n}/'.format(n=n) for n in range(1, 3)]

    custom_settings = {
                       'TOR_RENEW_IDENTITY_ENABLED': True,
                       'TOR_ITEMS_TO_SCRAPE_PER_IDENTITY': 5
                       }

    download_delay = 2    # Wait 2 seconds (actually a random time between 1 and 3 seconds) between downloading pages


    def parse(self, response):
        for quote in response.css('div.quote'):
            item = QuoteItem()
            item['text'] = quote.css('span.text::text').extract_first()
            item['author'] = quote.css('small.author::text').extract_first()
            item['tags'] = quote.css('div.tags a.tag::text').extract()
            yield item

settings.py,我已经激活了一个带有线条的Scrapy 扩展

EXTENSIONS = {
   'tutorial.extensions.TorRenewIdentity': 1,
}

extensions.py在哪里

import logging
import random
from scrapy import signals
from scrapy.exceptions import NotConfigured

import tutorial.tor_controller as tor_controller

logger = logging.getLogger(__name__)

class TorRenewIdentity(object):

    def __init__(self, crawler, item_count):
        self.crawler = crawler
        self.item_count = self.randomize(item_count)    # Randomize the item count to confound traffic analysis
        self._item_count = item_count                   # Also remember the given item count for future randomizations
        self.items_scraped = 0

        # Connect the extension object to signals
        self.crawler.signals.connect(self.item_scraped, signal=signals.item_scraped)

    @staticmethod
    def randomize(item_count, min_factor=0.5, max_factor=1.5):
        '''Randomize the number of items scraped before changing identity. (A similar technique is applied to Scrapy's DOWNLOAD_DELAY setting).'''
        randomized_item_count = random.randint(int(min_factor*item_count), int(max_factor*item_count))
        logger.info("The crawler will scrape the following (randomized) number of items before changing identity (again): {}".format(randomized_item_count))
        return randomized_item_count

    @classmethod
    def from_crawler(cls, crawler):
        if not crawler.settings.getbool('TOR_RENEW_IDENTITY_ENABLED'):
            raise NotConfigured

        item_count = crawler.settings.getint('TOR_ITEMS_TO_SCRAPE_PER_IDENTITY', 50)

        return cls(crawler=crawler, item_count=item_count)          # Instantiate the extension object

    def item_scraped(self, item, spider):
        '''When item_count items are scraped, pause the engine and change IP address.'''
        self.items_scraped += 1
        if self.items_scraped == self.item_count:
            logger.info("Scraped {item_count} items. Pausing engine while changing identity...".format(item_count=self.item_count))

            self.crawler.engine.pause()

            tor_controller.change_identity()                        # Change IP address (cf. https://stem.torproject.org/faq.html#how-do-i-request-a-new-identity-from-tor)
            self.items_scraped = 0                                  # Reset the counter
            self.item_count = self.randomize(self._item_count)      # Generate a new random number of items to scrape before changing identity again

            self.crawler.engine.unpause()

并且tor_controller.py

import logging
import sys
import socket
import time
import requests
import stem
import stem.control

# Tor settings
TOR_ADDRESS = socket.gethostbyname("tor")           # The Docker-Compose service in which this code is running should be linked to the "tor" service.
TOR_CONTROL_PORT = 9051         # This is configured in /etc/tor/torrc by the line "ControlPort 9051" (or by launching Tor with "tor -controlport 9051")
TOR_PASSWORD = "foo"            # The Tor password is written in the docker-compose.yml file. (It is passed as a build argument to the 'tor' service).

# Privoxy settings
PRIVOXY_ADDRESS = "privoxy"     # This assumes this code is running in a Docker-Compose service linked to the "privoxy" service
PRIVOXY_PORT = 8118             # This is determined by the "listen-address" in Privoxy's "config" file
HTTP_PROXY = 'http://{address}:{port}'.format(address=PRIVOXY_ADDRESS, port=PRIVOXY_PORT)

logger = logging.getLogger(__name__)


class TorController(object):
    def __init__(self):
        self.controller = stem.control.Controller.from_port(address=TOR_ADDRESS, port=TOR_CONTROL_PORT)
        self.controller.authenticate(password=TOR_PASSWORD)
        self.session = requests.Session()
        self.session.proxies = {'http': HTTP_PROXY}

    def request_ip_change(self):
        self.controller.signal(stem.Signal.NEWNYM)

    def get_ip(self):
        '''Check what the current IP address is (as seen by IPEcho).'''
        return self.session.get('http://ipecho.net/plain').text

    def change_ip(self):
        '''Signal a change of IP address and wait for confirmation from IPEcho.net'''
        current_ip = self.get_ip()
        logger.debug("Initializing change of identity from the current IP address, {current_ip}".format(current_ip=current_ip))
        self.request_ip_change()
        while True:
            new_ip = self.get_ip()
            if new_ip == current_ip:
                logger.debug("The IP address is still the same. Waiting for 1 second before checking again...")
                time.sleep(1)
            else:
                break
        logger.debug("The IP address has been changed from {old_ip} to {new_ip}".format(old_ip=current_ip, new_ip=new_ip))
        return new_ip

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.controller.close()


def change_identity():
    with TorController() as tor_controller:
        tor_controller.change_ip()

docker-compose build如果我开始使用后跟进行爬网docker-compose up,则扩展程序基本上可以工作:根据日志,它成功更改了 IP 地址并继续抓取。

然而,令我恼火的是,在引擎暂停期间,我会看到错误消息,例如

scraper_1  | 2017-05-12 16:35:06 [stem] INFO: Error while receiving a control message (SocketClosed): empty socket content

其次是

scraper_1  | 2017-05-12 16:35:06 [stem] INFO: Error while receiving a control message (SocketClosed): received exception "peek of closed file"

是什么导致了这些错误?既然他们有INFO水平,或许我可以无视他们?(我在https://gitweb.torproject.org/stem.git/上看过一些 Stem 的源代码,但到目前为止还无法掌握正在发生的事情)。

4

3 回答 3

5

您可以获取茎的 Logger 实例并将其关闭,如下所示:

from stem.util.log import get_logger

logger = get_logger()
logger.propagate = False
于 2020-03-24T18:06:05.323 回答
2

我不知道您是否就您的问题得出了什么结论。

我实际上得到了和你一样的日志消息。我的 Scrapy 项目表现良好,使用 Tor 和 privoxy 的 ip 轮换也很成功。我只是不断地得到日志INFO : [stem ] Error while receiving a control message (SocketClosed): empty socket content,这让我很烦。

我花了一些时间四处挖掘以找出导致它的原因,并查看我是否可以忽略它(毕竟,它只是一条信息消息而不是错误消息)。

底线是我不知道是什么原因造成的,但我觉得忽略它是足够安全的

正如日志所说,套接字内容(实际上是包含有关套接字连接相关信息的stem control_file)是空的。当 control_file 为空时,触发关闭套接字连接(根据 python 套接字文档)。我不确定是什么导致 control_file 为空以关闭套接字连接。但是,如果套接字连接真的关闭了,看起来套接字连接会成功打开,因为我的scrapy 的抓取工作和 ip 轮换运行良好。虽然我找不到真正的原因,但我只能假设几个原因:(1)Tor网络不稳定,(2)当你的代码运行时controller.signal(Signal.NEWNYM),套接字暂时关闭并再次打开,或者其他一些原因我暂时想不到。

于 2017-09-25T07:51:02.883 回答
1

我也处于类似情况,并意识到日志消息对 ip 轮换或爬网没有影响。但是,这些日志消息很烦人,可以使用装饰器将其静音:

def silence(func):
    def wrapper(*args, **kwargs):
        logger = logging.getLogger('stem')
        logger.disabled = True
        ret = func(*args, **kwargs)
        logger.enabled = True
        return ret
    return wrapper
于 2018-07-01T01:34:02.870 回答