5

我使用 Scrapy(又名 Twisted)和 Postgres 作为数据库。

在我的连接似乎填满之后,我的脚本被卡住了。我用这个查询检查了这个,SELECT * FROM pg_stat_activity;并读到它是因为 Postgres 没有连接池而引起的。

我读到了txpostgresPGBouncer,遗憾的是 Bouncer 不是一个选项,我还能做些什么来避免这个问题?

到目前为止,我使用以下管道

import psycopg2
from twisted.enterprise import adbapi
import logging
from datetime import datetime
import scrapy
from scrapy.exceptions import DropItem


class PostgreSQLPipeline(object):
    """ PostgreSQL pipeline class """

    def __init__(self, dbpool):
        self.logger = logging.getLogger(__name__)
        self.dbpool = dbpool

    @classmethod
    def from_settings(cls, settings):
        dbargs = dict(
                host=settings['POSTGRESQL_HOST'],
                database=settings['POSTGRESQL_DATABASE'],
                user=settings['POSTGRESQL_USER'],
                password=settings['POSTGRESQL_PASSWORD'],
        )
        dbpool = adbapi.ConnectionPool('psycopg2', **dbargs)
        return cls(dbpool)

    def process_item(self, item, spider):
        d = self.dbpool.runInteraction(self._insert_item, item, spider)
        d.addErrback(self._handle_error, item, spider)
        d.addBoth(lambda _: item)

        return d

    def _insert_item(self, txn, item, spider):
        """Perform an insert or update."""
        now = datetime.utcnow().replace(microsecond=0).isoformat(' ')

        txn.execute(
                """
                  SELECT EXISTS(
                    SELECT 1
                    FROM expose
                    WHERE expose_id = %s
                  )
                """, (
                    item['expose_id'],
                )
        )
        ret = txn.fetchone()[0]

        if ret:
            self.logger.info("Item already in db: %r" % (item))

            txn.execute(
                    """
                        UPDATE expose
                        SET last_seen=%s, offline=0
                        WHERE expose_id=%s
                    """, (
                        now,
                        item['expose_id']
                    )
            )
        else:
            self.logger.info("Item stored in db: %r" % (item))
            txn.execute("""
                          INSERT INTO expose (
                            expose_id,
                            title
                          ) VALUES (%s, %s)
                        """, (
                            item['expose_id'],
                            item['title']
                        )
            )

            # Write image info (path, original url, ...) to db, CONSTRAIN to expose.expose_id
            for image in item['images']:
                txn.execute(
                        """
                          INSERT INTO image (
                            expose_id,
                            name
                          ) VALUES (%s, %s)
                        """, (
                            item['expose_id'],
                            image['path'].replace('full/', '')  
                        )
                )

    def _handle_error(self, failure, item, spider):
        """Handle occurred on db interaction."""
        # do nothing, just log
        self.logger.error(failure, failure.printTraceback())
4

0 回答 0