我使用 Scrapy(又名 Twisted)和 Postgres 作为数据库。
在我的连接似乎填满之后,我的脚本被卡住了。我用这个查询检查了这个,SELECT * FROM pg_stat_activity;
并读到它是因为 Postgres 没有连接池而引起的。
我读到了txpostgres和PGBouncer,遗憾的是 Bouncer 不是一个选项,我还能做些什么来避免这个问题?
到目前为止,我使用以下管道:
import psycopg2
from twisted.enterprise import adbapi
import logging
from datetime import datetime
import scrapy
from scrapy.exceptions import DropItem
class PostgreSQLPipeline(object):
""" PostgreSQL pipeline class """
def __init__(self, dbpool):
self.logger = logging.getLogger(__name__)
self.dbpool = dbpool
@classmethod
def from_settings(cls, settings):
dbargs = dict(
host=settings['POSTGRESQL_HOST'],
database=settings['POSTGRESQL_DATABASE'],
user=settings['POSTGRESQL_USER'],
password=settings['POSTGRESQL_PASSWORD'],
)
dbpool = adbapi.ConnectionPool('psycopg2', **dbargs)
return cls(dbpool)
def process_item(self, item, spider):
d = self.dbpool.runInteraction(self._insert_item, item, spider)
d.addErrback(self._handle_error, item, spider)
d.addBoth(lambda _: item)
return d
def _insert_item(self, txn, item, spider):
"""Perform an insert or update."""
now = datetime.utcnow().replace(microsecond=0).isoformat(' ')
txn.execute(
"""
SELECT EXISTS(
SELECT 1
FROM expose
WHERE expose_id = %s
)
""", (
item['expose_id'],
)
)
ret = txn.fetchone()[0]
if ret:
self.logger.info("Item already in db: %r" % (item))
txn.execute(
"""
UPDATE expose
SET last_seen=%s, offline=0
WHERE expose_id=%s
""", (
now,
item['expose_id']
)
)
else:
self.logger.info("Item stored in db: %r" % (item))
txn.execute("""
INSERT INTO expose (
expose_id,
title
) VALUES (%s, %s)
""", (
item['expose_id'],
item['title']
)
)
# Write image info (path, original url, ...) to db, CONSTRAIN to expose.expose_id
for image in item['images']:
txn.execute(
"""
INSERT INTO image (
expose_id,
name
) VALUES (%s, %s)
""", (
item['expose_id'],
image['path'].replace('full/', '')
)
)
def _handle_error(self, failure, item, spider):
"""Handle occurred on db interaction."""
# do nothing, just log
self.logger.error(failure, failure.printTraceback())