0

我正在使用 Scrapy 库来抓取一个销售汽车的网站。

我正在使用 Python 和 IBM Cloud 函数以及 Scrapy 来实现这一点。这个想法是每天使用 IBM Cloud 操作抓取站点,并将每辆车添加到vehiclesPostgres 数据库中的表中。那部分工作正常。

表结构vehicles如下:

在此处输入图像描述

第一步是将除数据列(即汽车的详细信息以及需要在第二步中添加的那些)之外的所有内容添加到vehicles表中。这很好用。

第二步是每天检查vehicles表中添加的每一辆车是否仍然存在于网站上(可以删除或出售)。在这一步中,我将每个循环车辆添加到daily_run_vehicle表中。的结构daily_run_vehicle如下:

在此处输入图像描述

如果车辆存在,我会抓取详细信息并更新vehiclesdata列并将表中的列设置handled为 TRUE daily_run_vehicle。如果它被出售或删除,那么我会增加表retries中的列daily_run_vehicle

第二步应该每天运行。

首先,我遍历表vehicles中的handleddaily_run_vehicle不为 TRUE 或 ifhandled为 False 但数量retries为 5 或更多的所有车辆。对于每次迭代,我都会在daily_run_vehicle表中添加一条新记录。

动作是prepare-get-vehicles,代码如下:

import json
import requests
from common.db import add_logs, get_vehicle_references
from common.db import capture_error
from common.common import APIHOST, NAMESPACE, USER_PASS


def execute_reference(reference, reference_url):
    action = "prepare-get-vehicle"
    url = APIHOST + "/api/v1/namespaces/" + NAMESPACE + "/actions/" + action
    response = requests.post(url,
                             data=json.dumps({"reference": reference, 'reference_url': reference_url}),
                             params={"blocking": "false"},
                             auth=(USER_PASS[0], USER_PASS[1]),
                             headers={"Content-Type": "application/json"})
    print(response.json())


def main(params):
    try:
        for reference in get_vehicle_references():
            execute_reference(reference[0], reference[1])

        return {"Success": "prepare-get-vehicles action executed successfully."}
    except Exception as e:
        capture_error(str(e))
        return {"Failure": "prepare-get-vehicles action NOT executed successfully."}

get_vehicle_references功能如下:

def get_vehicle_references():
    conn = db_connection()
    cur = conn.cursor()
    try:
        s = "SELECT reference, reference_url FROM vehicles v WHERE (NOT EXISTS (select reference from daily_run_vehicle WHERE (handled = %s or (handled = %s and retries >= %s)) AND reference = v.reference))"
        cur.execute(s, (True, False, 5))
        return cur.fetchall()
    except Exception as e:
        capture_error(str(e))
    conn.close()

prepare-get-vehicle除了向表中添加新记录外,操作什么也不daily_run_vehicle做,如下所示:

def main(params):
    try:
        insert_daily_run_vehicle(params.get("reference", None), params.get("reference_url", None))
        return {"Success.": "The DB filler (daily_run_vehicle) is successfully executed."}
    except Exception as e:
        capture_error(str(e))
        return {"Failure": "The DB filler (daily_run_vehicle) action NOT executed successfully."}

但问题是该vehicles表有超过 30 万条记录,并且每天都变得越来越大。比实际中的 for 循环在prepare-get-vehiclesIBM Cloud 上执行要花费很多时间。有 600 秒超时,但 for 循环需要更多时间。

有什么建议可以解决我的问题以及如何循环遍历具有超过 300k 记录的表并为每条记录添加新行daily_run_table

提前致谢。

4

2 回答 2

0

如果车辆的 id 没有改变,你可以这样做:

INSERT INTO vehicle (id, reference, ...etc...) VALUES (1, 'ref', ...etc...) ON CONFLICT DO NOTHING;

插入而不在现有行上循环。您甚至可以更新冲突http://www.postgresqltutorial.com/postgresql-upsert/

于 2019-01-05T12:46:48.323 回答
0

对于处理大型表数据库,您可以分批读取表行,对于每个批处理以编程方式运行一个新操作,每个操作可以并行运行,每个操作最多需要 600 秒。

例如,如果您有一个 300k 的表,请按特定顺序循环遍历表行,例如每批 100k。然后,您可以以编程方式并行调用 3 个操作,每个操作处理每批 100k。

于 2019-01-05T18:21:10.337 回答