2

我正在使用自己的管道将报废的项目存储到 PostgreSQL 数据库中,几天前我进行了扩展,现在将数据存储到 3 个数据库中。所以,我想创建插入数据的管道,每 100 个项目调用一次,或者将项目插入 100 x 100。

我想让它在数据库服务器上快速且不那么令人头疼的原因。

4

5 回答 5

7

解决方案与 Anandhakumar 的答案没有什么不同我在设置文件中创建了一个全局列表,并为其设置了 setter 和 getter 方法

# This buffer for the bluk insertion
global products_buffer

products_buffer = []

# Append product to the list
def add_to_products_buffer(product):
  global products_buffer
  products_buffer.append(product)

# Get the length of the product
def get_products_buffer_len():
  global products_buffer
  return len(products_buffer)

# Get the products list
def get_products_buffer():
  global products_buffer
  return products_buffer

# Empty the list
def empty_products_buffer():
  global products_buffer
  products_buffer[:] = []

然后我将它导入管道

from project.settings import products_buffer,add_to_products_buffer,get_products_buffer_len,empty_products_buffer,get_products_buffer

每次调用管道时,我都会将项目附加到列表中,并检查列表的长度是否为 100 我在列表上循环以准备许多插入请求,但最重要的魔法是将它们全部提交在一行中,不要在循环中提交,否则您将一无所获,并且将它们全部插入需要很长时间。

def process_item(self, item, spider):  
    # Adding the item to the list
    add_to_products_buffer(item)
    # Check if the length is 100
    if get_products_buffer_len() == 100:
        # Get The list to loop on it
        products_list  = get_products_buffer()
        for item in products_list:
            # The insert query
            self.cursor.execute('insert query')
        try:
            # Commit to DB the insertions quires 
            self.conn.commit()
            # Emty the list
            empty_products_buffer()
        except Exception, e:
            # Except the error

executemany如果您不想循环,也可以使用。

于 2015-03-30T07:53:59.080 回答
0

我不知道scrapy,如果它内置了任何类型的队列功能,但也许你可以将你的查询从scrapy推送到一个标准的python队列上,然后让一个消费者监控队列,只要有一个上面有 100 个项目,全部执行,这确实可以由 psycopg2 完成(参见psycopg2: insert multiple rows with one query)。

你可以做类似的事情

queryQueue = Queue()
def queryConsumer(){
    while True:
    if queryQueue.qsize()==100:
        queries=[queryQueue.get() for i in range(100)]            
        #execute the 100 queries
}
t = Thread(target=queryConsumer)
t.daemon = True
t.start()

从您的scrapy方法中,您可以调用

queryQueue.put(myquery)

将项目推入队列。

于 2015-03-30T04:43:07.770 回答
0

我的建议是,

在你的蜘蛛本身中,你可以做到这一点,根本不需要在管道中编写。

使用 psycopg2 在蜘蛛中创建一个 postgres 数据库连接。

psycopg2.connect(database="testdb", user="postgres", password="cohondob", host="127.0.0.1", port="5432")
connection.cursor()

在 parse 函数中创建一个元组并将报废的项目附加到该列表中。

如果达到 100 则插入到 db 中,然后将其提交到 db 中。

例如:

            x = ({"name":"q", "lname":"55"},
            {"name":"e", "lname":"hh"},
            {"name":"ee", "lname":"hh"})
cur.executemany("""INSERT INTO bar(name,lname) VALUES (%(name)s, %(lname)s)""", x)
于 2015-03-30T07:15:49.393 回答
0

这不是一个 scrapy 选项,而是一个用于批量插入行的 psycopg2 选项。这个问题有你可以使用的选项: Psycopg2, Postgresql, Python: Fastest way to bulk-insert

于 2015-03-16T13:35:56.323 回答
0

我只是写了一个小scrapy扩展来将抓取的项目保存到数据库中。scrapy-sqlitem

它非常易于使用。

pip install scrapy_sqlitem

使用 SqlAlchemy 表定义 Scrapy 项

from scrapy_sqlitem import SqlItem

class MyItem(SqlItem):
    sqlmodel = Table('mytable', metadata
        Column('id', Integer, primary_key=True),
        Column('name', String, nullable=False))

编写你的蜘蛛并从 SqlSpider 继承

from scrapy_sqlitem import SqlSpider

class MySpider(SqlSpider):
   name = 'myspider'

   start_urls = ('http://dmoz.org',)

   def parse(self, response):
        selector = Selector(response)
        item = MyItem()
        item['name'] = selector.xpath('//title[1]/text()').extract_first()
        yield item

将 DATABASE_URI 和 chunksize 设置添加到 settings.py。

DATABASE_URI = "postgresql:///mydb"

DEFAULT_CHUNKSIZE = 100

CHUNKSIZE_BY_TABLE = {'mytable': 100, 'othertable': 250}

创建表格,您就完成了!

http://doc.scrapy.org/en/1.0/topics/item-pipeline.html#activating-an-item-pipeline-component

http://docs.sqlalchemy.org/en/rel_1_1/core/tutorial.html#define-and-create-tables

于 2015-08-16T20:02:50.393 回答