我正在 Scrapy 中寻找一些 SQLite 管道的示例代码。我知道没有内置的支持它,但我确信它已经完成了。只有实际的代码可以帮助我,因为我只知道足够的 Python 和 Scrapy 来完成我非常有限的任务,并且需要代码作为起点。
5 回答
我做了这样的事情:
#
# Author: Jay Vaughan
#
# Pipelines for processing items returned from a scrape.
# Dont forget to add pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/topics/item-pipeline.html
#
from scrapy import log
from pysqlite2 import dbapi2 as sqlite
# This pipeline takes the Item and stuffs it into scrapedata.db
class scrapeDatasqLitePipeline(object):
def __init__(self):
# Possible we should be doing this in spider_open instead, but okay
self.connection = sqlite.connect('./scrapedata.db')
self.cursor = self.connection.cursor()
self.cursor.execute('CREATE TABLE IF NOT EXISTS myscrapedata ' \
'(id INTEGER PRIMARY KEY, url VARCHAR(80), desc VARCHAR(80))')
# Take the item and put it in database - do not allow duplicates
def process_item(self, item, spider):
self.cursor.execute("select * from myscrapedata where url=?", item['url'])
result = self.cursor.fetchone()
if result:
log.msg("Item already in database: %s" % item, level=log.DEBUG)
else:
self.cursor.execute(
"insert into myscrapedata (url, desc) values (?, ?)",
(item['url'][0], item['desc'][0])
self.connection.commit()
log.msg("Item stored : " % item, level=log.DEBUG)
return item
def handle_error(self, e):
log.err(e)
这是一个带有 sqlalchemy 的 sqlite 管道。使用 sqlalchemy,您可以在需要时轻松更改数据库。
在settings.py
添加数据库配置
# settings.py
# ...
DATABASE = {
'drivername': 'sqlite',
# 'host': 'localhost',
# 'port': '5432',
# 'username': 'YOUR_USERNAME',
# 'password': 'YOUR_PASSWORD',
'database': 'books.sqlite'
}
然后在pipelines.py
添加以下
# pipelines.py
import logging
from scrapy import signals
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool
logger = logging.getLogger(__name__)
DeclarativeBase = declarative_base()
class Book(DeclarativeBase):
__tablename__ = "books"
id = Column(Integer, primary_key=True)
title = Column('title', String)
author = Column('author', String)
publisher = Column('publisher', String)
url = Column('url', String)
scrape_date = Column('scrape_date', DateTime)
def __repr__(self):
return "<Book({})>".format(self.url)
class SqlitePipeline(object):
def __init__(self, settings):
self.database = settings.get('DATABASE')
self.sessions = {}
@classmethod
def from_crawler(cls, crawler):
pipeline = cls(crawler.settings)
crawler.signals.connect(pipeline.spider_opened, signals.spider_opened)
crawler.signals.connect(pipeline.spider_closed, signals.spider_closed)
return pipeline
def create_engine(self):
engine = create_engine(URL(**self.database), poolclass=NullPool, connect_args = {'charset':'utf8'})
return engine
def create_tables(self, engine):
DeclarativeBase.metadata.create_all(engine, checkfirst=True)
def create_session(self, engine):
session = sessionmaker(bind=engine)()
return session
def spider_opened(self, spider):
engine = self.create_engine()
self.create_tables(engine)
session = self.create_session(engine)
self.sessions[spider] = session
def spider_closed(self, spider):
session = self.sessions.pop(spider)
session.close()
def process_item(self, item, spider):
session = self.sessions[spider]
book = Book(**item)
link_exists = session.query(Book).filter_by(url=item['url']).first() is not None
if link_exists:
logger.info('Item {} is in db'.format(book))
return item
try:
session.add(book)
session.commit()
logger.info('Item {} stored in db'.format(book))
except:
logger.info('Failed to add {} to db'.format(book))
session.rollback()
raise
return item
items.py
应该是这样的
#items.py
import scrapy
class BookItem(scrapy.Item):
title = scrapy.Field()
author = scrapy.Field()
publisher = scrapy.Field()
scrape_date = scrapy.Field()
您也可以考虑class Book
搬入items.py
如果您对 twisted 的 adbapi 感到满意,您可以将此 mysql 管道作为起点:http: //github.com/darkrho/scrapy-googledir-mysql/blob/master/googledir/pipelines.py
并在以下位置使用此行__init__
:
self.dbpool = adbapi.ConnectionPool("sqlite3", database="/path/sqlite.db")
对于任何试图解决类似问题的人,我刚刚遇到了一个不错的 Sqlite Item Exproter for SQLite:https ://github.com/RockyZ/Scrapy-sqlite-item-exporter 。
将其包含到您的项目设置中后,您可以将其用于:
scrapy crawl <spider name> -o sqlite.db -t sqlite
它也可以被调整为用作项目管道而不是项目导出器。
以下内容应与以下内容兼容:
# Python 3.6.4
# Scrapy 1.5.1
# SQLite 3.21.0+
# APSW 3.9.2.post1
如果您想使用APSW,只需按照评论中的说明替换 sqlite 即可。
如果你items.py
看起来像这样:
...
class NewAdsItem(Item):
AdId = Field()
DateR = Field()
DateA = Field()
DateN = Field()
DateE = Field()
URL = Field() # AdURL
在settings.py
:
ITEM_PIPELINES = {
'adbot.pipelines.DbPipeline': 100,
}
SQLITE_FILE = 'mad.db'
SQLITE_TABLE = 'ads'
在pipelines.py
:
import os
import sqlite3 # pip install pysqlite3
#import apsw # pip install apsw
from scrapy import signals
from scrapy.conf import settings
from adbot.items import NewAdsItem # Get items from "items.py"
con = None
ikeys = None
class DbPipeline(object):
dbfile = settings.get('SQLITE_FILE') # './test.db'
dbtable = settings.get('SQLITE_TABLE')
def __init__(self):
self.setupDBCon()
self.createTables()
def setupDBCon(self):
#self.con = apsw.Connection(self.dbfile) # apsw
self.con = sqlite3.connect(self.dbfile) # sqlite3
self.cur = self.con.cursor()
def createTables(self):
self.dropDbTable()
self.createDbTable()
def dropDbTable(self):
print("Dropping old table: %s" % self.dbtable )
self.cur.execute("DROP TABLE IF EXISTS %s" % self.dbtable )
def closeDB(self):
self.con.close()
def __del__(self):
self.closeDB()
def createDbTable(self):
print("Creating new table: %s" % self.dbtable )
#------------------------------
# Construct the item strings:
#------------------------------
# NOTE: This does not work, because items.py class re-orders the items!
#self.ikeys = NewAdsItem.fields.keys()
#print("Keys in creatDbTable: \t%s" % ",".join(self.ikeys) )
#cols = " TEXT, ".join(self.ikeys) + " TEXT"
#print("cols: \t%s:" % cols, flush=True)
#------------------------------
cols = "AdId TEXT, DateR TEXT, DateA TEXT, DateN TEXT, DateE TEXT, URL TEXT"
# NOTE: Use "INSERT OR IGNORE", if you also use: "AdId TEXT NOT NULL UNIQUE"
sql = "CREATE TABLE IF NOT EXISTS %s (id INTEGER PRIMARY KEY NOT NULL, %s)" % (self.dbtable, cols )
#print (sql)
self.cur.execute(sql)
def process_item(self, item, spider):
self.storeInDb(item)
return item
def storeInDb(self, item):
# NOTE: Use "INSERT OR IGNORE", if you also use: "AdId TEXT NOT NULL UNIQUE"
# "INSERT INTO ads ( AdId, DateR, AdURL ) VALUES (?, ?, ?)"
sql = "INSERT INTO {0} ({1}) VALUES ({2})".format(self.dbtable, ','.join(item.keys()), ','.join(['?'] * len(item.keys())) )
# (item.get('AdId',''),item.get('DateR',''),item.get('AdURL',''), ...)
itkeys = ','.join(item.keys()) # item keys as a list
itvals = ','.join(item.values()) # item values as a list
ivals = tuple(item.values()) # item values as a tuple
#print (sql)
#print(" itkeys: \t%s" % itkeys, flush=True)
#print(" itvals: \t%s" % itvals, flush=True)
self.cur.execute(sql, ivals) # WARNING: Does NOT handle '[]'s ==> use: '' in spider
self.con.commit() # used in sqlite3 ONLY! (Remove for APSW)
然后,您可以使用以下命令从命令行检查您的数据库:
echo "select * from ads;" | sqlite3 -csv -header mad.db
警告:
因为通过获取或直接通过导入您的项目类时项目键的排序方式有所不同,您会发现第一种情况是根据出现的顺序(在文件中)排序的,而在第二种情况下是按字母顺序排列。这是非常可悲的,因为当您在执行之前尝试动态创建数据库表时会产生麻烦。
items.py
item.keys()
self.ikeys = NewAdsItem.fields.keys()
process_item()