import requests, pymysql, pymysqlpool
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial
from tenacity import retry, TryAgain, stop_after_attempt
pool = PooledDB(creator = pymysql,
maxconnections = 0,
autocommit = True,
host = 'localhost',
user = 'someuser',
passwd = 'somepwd',
database = 'database')
db = pool.connection()
cur = db.cursor()
dict = {key1: url1,
key2: url2}
@retry(stop=stop_after_attempt(10))
def get_id_list_from_a_website(session, dict_item):
key, url = dict_item
sess = requests.session()
id_list_return = []
...
scrape website to get id list here
...
if not id_list_return:
raise TryAgain()
return id_list_return
def insert_sql(id_after_check, list_of_variable_after_check)
id = id_after_check
list_of_variable = list_of_variable_after_check
if id == 'exist':
print('key_exist')
else:
try:
sql1 = insert list_of_variable to id
cur.execute(sql1)
cur.close()
db.commit()
print('successfully insert')
except:
print('except')
def get_some_data(session, executor):
dict_item = dict.items()
id_list_return = list(executor.map(partial(get_list, session), (dict_item)))
raw_id_list_return = []
for i in range(0, len(dict.keys())):
raw_id_list_return = raw_id_list_return + id_list_return[i]
futures = {executor.submit(partial(check_stuff, session), stuff_id): stuff_id for stuff_id in raw_list_return}
for future in as_completed(futures):
futures[future]
stuff_id_data = future.result()
stuff_id = stuff_data_from_check[0]
stuff_variables = stuff_data_from_check[1:]
if stuff_variables == 'id_existed'
print('stuff_id_exist')
else:
print('go_insert')
go_insert = executor.submit(partial(insert_sql, stuff_id), stuff_variables)
db.close()
@retry(stop=stop_after_attempt(10))
def check_stuff(session, stuff_id):
stuff_id = stuff_id_from_raw_list
sql2 = """SELECT * FROM `tbl1` WHERE stuff_id = '"""+str(stuff_id)+"""';"""
cur.execute(sql2)
row_count = cur.fetchall()
if row_count == int(0):
sql3= """UPDATE `tbl1` SET latest_search_date='%s' WHERE stuff_id='%s';""" % (search_date, stuff_id)
cur.execute(sql3)
cur.close()
stuff_id_and_variables = [stuff_id, 'stuff_id existed']
else:
scape data here and return 'stuff_id_and_variable'
if not stuff_id_and_variable:
raise TryAgain()
return stuff_id_and_variables
N_THREADS=50
with requests.Session() as session:
with ThreadPoolExecutor(max_workers=N_THREADS) as executor:
get_some_data(session, executor)
问题是:当我检查数据库中的 id(执行 sql2)时,很少有 sql2 可以正常运行并获得“stuff_id 存在”。但是,经过一些查询,即使数据库中的 stuff_id 已经存在,sql2 也无法正常运行,然后“check_stuff()”再次抓取数据。所以,你可以想象函数 'insert_sql()' 将被引发并导致异常......虽然我可以将 'Insert into' 更改为 'Replace into' 以完成数据插入过程,但我想毫无例外地获取代码并且在重复抓取过程上花费更少的时间。谢谢。