0
import requests, pymysql, pymysqlpool
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial
from tenacity import retry, TryAgain, stop_after_attempt

pool = PooledDB(creator = pymysql,
                maxconnections = 0,
                autocommit = True,
                host = 'localhost',
                user = 'someuser',
                passwd = 'somepwd',
                database = 'database')

db = pool.connection()
cur = db.cursor()
dict = {key1: url1,
key2: url2}

@retry(stop=stop_after_attempt(10))
def get_id_list_from_a_website(session, dict_item):
    key, url = dict_item
    sess =  requests.session()

    id_list_return = []
    ...
    scrape website to get id list here
    ...
    if not id_list_return:
        raise TryAgain()
    return id_list_return

def insert_sql(id_after_check, list_of_variable_after_check)
    id = id_after_check
    list_of_variable = list_of_variable_after_check
    if id == 'exist':
        print('key_exist')
    else:
        try:
            sql1 = insert list_of_variable to id
            cur.execute(sql1)
            cur.close()
            db.commit()
            print('successfully insert')
        except:
            print('except')

def get_some_data(session, executor):
    dict_item = dict.items()
    id_list_return = list(executor.map(partial(get_list, session), (dict_item)))
    raw_id_list_return = []

    for i in range(0, len(dict.keys())):
    raw_id_list_return = raw_id_list_return + id_list_return[i]

    futures = {executor.submit(partial(check_stuff, session), stuff_id): stuff_id for stuff_id in raw_list_return}
    for future in as_completed(futures):
        futures[future]
        stuff_id_data = future.result()
        stuff_id = stuff_data_from_check[0]
        stuff_variables = stuff_data_from_check[1:]
        if stuff_variables == 'id_existed'
            print('stuff_id_exist')
        else:
            print('go_insert')
            go_insert = executor.submit(partial(insert_sql, stuff_id), stuff_variables)
            db.close()

@retry(stop=stop_after_attempt(10))
def check_stuff(session, stuff_id):
    stuff_id = stuff_id_from_raw_list
    sql2 = """SELECT * FROM `tbl1` WHERE stuff_id = '"""+str(stuff_id)+"""';"""
    cur.execute(sql2)
    row_count = cur.fetchall()

    
    if row_count == int(0):
       
        sql3= """UPDATE `tbl1` SET latest_search_date='%s' WHERE stuff_id='%s';""" % (search_date, stuff_id)
        cur.execute(sql3)
        cur.close()
        stuff_id_and_variables = [stuff_id, 'stuff_id existed']

    else:
        scape data here and return 'stuff_id_and_variable' 
        if not stuff_id_and_variable:
            raise TryAgain()

    return stuff_id_and_variables

N_THREADS=50
with requests.Session() as session:
    with ThreadPoolExecutor(max_workers=N_THREADS) as executor:
        get_some_data(session, executor)

问题是:当我检查数据库中的 id(执行 sql2)时,很少有 sql2 可以正常运行并获得“stuff_id 存在”。但是,经过一些查询,即使数据库中的 stuff_id 已经存在,sql2 也无法正常运行,然后“check_stuff()”再次抓取数据。所以,你可以想象函数 'insert_sql()' 将被引发并导致异常......虽然我可以将 'Insert into' 更改为 'Replace into' 以完成数据插入过程,但我想毫无例外地获取代码并且在重复抓取过程上花费更少的时间。谢谢。

4

0 回答 0