我有一个 python 脚本,它验证从数据库中的某些行获取的数据,然后将错误记录在同一数据库的不同表中。该脚本验证每一行并将其标记为已验证,并且错误 = True/False,具体取决于验证结果。对每一行重复此过程。有了这个,我想我会通过创建线程来添加一些类固醇,这样每一行的验证都由独立的线程完成,从而减少了验证一批行所需的时间。
令我惊讶的是,我发现线程脚本比非线程脚本花费的时间稍长。平均而言,验证 1502 行数据需要非线程脚本 1.5 秒,而线程脚本需要 2.27 秒。这可能不多,但理想情况下,我将一次运行 200 万条记录,这样时间开销就会很大。再加上我会假设线程应用程序会更快地完成!:-)
这两个脚本的时钟时间相同,大约为 0.01 秒,直到创建线程为止。至此,SQLAlchemy 会话已创建,所有要验证的数据和关系(即外键等)都已获取。不过,从那里开始,非线程脚本完成得更快。下面是我的代码。
1.0 无线程脚本
#Alot of code goes above this to fetch the data that is passed on to the validator function
#However, the two scripts are the same upto this point in regards to time taken so didn't see need to post them.
for lf_detail_id in load_file_detail_id:
params = lf_detail_id, load_file_id, entry_number[lf_detail_counter], \
data[lf_detail_counter], template_version[lf_counter], \
load_file_detail, error, dt_file, dt_columns
data_list.append(params)
lf_detail_counter += 1
no_of_records += 1
validator = Validate()
validator.validator(no_of_records, data_list)
record_counter += lf_detail_counter
data_list = None
no_of_records = 0
print("Validated '%s': seconds %s" %(filename[lf_counter], time.time()-file_start_time)) #print time it took to run'
#Mark the load file as validated
is_done = load_file.set_validation(load_file_id, True)
if is_done == False:
raise Exception ("Can't update load_file's is_validated parameter: ", lf_detail_id)
#Reset counters
lf_detail_counter = 0
lf_counter += 1
#Commit The Entire Transaction.
session.commit()
print("NoThread:Finished validating %s file(s) with %s record(s) in %s seconds\n" %(lf_counter, record_counter, time.time()- process_start_time))
1.1。非线程脚本的验证函数
class Validate():
has_error = None
def validator(self, loop_length, job):
'''Validate data'''
for row_counter in range(loop_length):
load_file_detail_id, load_file_id, entry_number, data, \
template_version, load_file_detail, error, dt_file, dt_columns = job[row_counter]
error_detail = ErrorLogDetail()
if data.strip() == "":
error_detail.errorlog = error
error_detail.load_file_detail_id = load_file_detail_id
error_detail.pos_row = entry_number
error_detail.pos_col = None
error_detail.value_provided = None
error_detail.column_name = None
error_detail.value_provided = None
error_detail.description = "error message 1"
session.add(error_detail)
error_detail = ErrorLogDetail()
self.has_error = True
self.set_validation(load_file_detail, load_file_detail_id, True, False)
continue
elif len(data) != int(dt_file.data_length):
error_detail.errorlog = error
error_detail.load_file_detail_id = load_file_detail_id = load_file_detail_id
error_detail.pos_row = entry_number
error_detail.pos_col = None
error_detail.column_name = None
error_detail.value_provided = None
error_detail.description = "error message 2"
session.add(error_detail)
error_detail = ErrorLogDetail()
self.has_error = True
self.set_validation(load_file_detail, load_file_detail_id, True, False)
continue
else:
#Continue with extra validation
#If record passes all validation then mark mark it as haserror = False
if self.has_error == False:
self.set_validation(load_file_detail, load_file_detail_id, False, True)
else:
self.has_error = False
jobs.task_done() #For the script with threading the job is marked as done. Else this does not appear in the non-threaded script
2.0 线程脚本
#Alot of code goes above this to fetch the data that is passed on to the validator function
#However, the two scripts are the same upto this point in regards to time taken so didn't see need to post them.
for lf_detail_id in load_file_detail_id:
params = lf_detail_id, load_file_id, entry_number[lf_detail_counter], \
data[lf_detail_counter], template_version[lf_counter], \
load_file_detail, error, dt_file, dt_columns
data_list.append(params)
lf_detail_counter += 1
queue_size += 1
if queue_size == THREAD_LIMIT:
myqueuing(queue_size, data_list)
queue_size = 0
#spawn a pool of threads, and pass them queue instance
if queue_size > 0:
myqueuing(queue_size, data_list)
#Keep record of rows processed
record_counter += lf_detail_counter
print("Validated '%s': seconds- %s " %(filename[lf_counter], time.time()-file_start_time)) #print time it took to run'
#Mark the load file as validated
is_done = load_file.set_validation(load_file_id, True)
if is_done == False:
raise Exception ("Can't update load_file's is_validated parameter: ", lf_detail_id)
#Commit The Entire Transaction.
session.commit()
#Reset counters
lf_detail_counter = 0
lf_counter += 1
data_list = None
queue_size = 0
print("HasThread:Finished loading %s file(s) with %s record(s) in %s seconds\n" %(lf_counter, record_counter, time.time()-process_start_time)) #print time it took to run'
2.1。线程验证函数
THREAD_LIMIT = 50 # This is how many threads we want
jobs = queue.Queue() # This sets up the queue object to use 5 slots
singlelock = threading.Lock() # This is a lock so threads don't print trough each other (and other reasons)
def myqueuing(queuesize, data):
'''Put the fetched data in a queue and instantiate threads to
process the queue'''
# Spawn the threads
is_valid_date("20131212", True) #Calling this here to avoid a bug in time.striptime() when threading
for x in range(queuesize):
# This is the thread class that we instantiate.
workerbee().start()
# Put stuff in queue
for i in range(queuesize):
# Block if queue is full, and wait 2 seconds. After 5s raise Queue Full error.
try:
jobs.put(data[i], block=True, timeout=2)
except:
singlelock.acquire()
print ("The queue is full !")
singlelock.lock.release()
# Wait for the threads to finish
singlelock.acquire() # Acquire the lock so we can print
print ("Waiting for threads to finish.")
singlelock.release() # Release the lock
jobs.join() # This command waits for all threads to finish.
class workerbee(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.lock = threading.Lock()
self.has_error = False
def run(self):
#try:
job = jobs.get(True,1)
load_file_detail_id, load_file_id, entry_number, data, \
template_version, load_file_detail, error, dt_file, dt_columns = job
'''Validates the data.'''
error_detail = ErrorLogDetail()
#Again please note that this part is identical for both the non-threaded and the threaded script.
#After each pass on a record, the record is marked as validated and if has_error = True
if data.strip() == "":
error_detail.errorlog = error
error_detail.load_file_detail_id = load_file_detail_id
error_detail.pos_row = entry_number
error_detail.pos_col = None
error_detail.value_provided = None
error_detail.column_name = None
error_detail.value_provided = None
error_detail.description = "erro message1"
session.add(error_detail)
error_detail = ErrorLogDetail()
self.has_error = True
self.set_validation(load_file_detail, load_file_detail_id, True, True)
elif len(data) != int(dt_file.data_length):
error_detail.errorlog = error
error_detail.load_file_detail_id = load_file_detail_id = load_file_detail_id
error_detail.pos_row = entry_number
error_detail.pos_col = None
error_detail.column_name = None
error_detail.value_provided = None
error_detail.description = "erro message2")
session.add(error_detail)
error_detail = ErrorLogDetail()
self.has_error = True
self.set_validation(load_file_detail, load_file_detail_id, True, True)
else:
#Continue with further validation - about 5 other validation checks
#If record passes all validation then mark mark it as haserror = False
if self.has_error == False:
self.set_validation(load_file_detail, load_file_detail_id, False, True)
else:
self.has_error = False
jobs.task_done() #For the script with threading the job is marked as done. Else this does not appear in the non-threaded script
3.0。在线程和非线程中设置验证的通用函数
def set_validation(self, load_file_detail, load_file_detail_id, has_error, can_be_loaded):
'''Mark the record as having been validated and whether has error = True or False'''
#print("haserror and canbeloaded ", has_error, can_be_loaded)
is_done = load_file_detail.set_validation_and_error(load_file_detail_id, True, has_error, can_be_loaded)
if is_done == False:
raise Exception ("Can't update load_file_detail's is_validated parameter: ", load_file_detail_id)
3.1。用于保存验证状态的实际 SQLAlchemy 会话
def set_validation_and_error(self, load_file_detail_id, is_validated, has_error, can_be_loaded):
result = session.execute('UPDATE load_file_detail SET is_validated=%s, has_error=%s, can_be_loaded=%s WHERE id=%s' \
%(is_validated, has_error, can_be_loaded, load_file_detail_id))
因此,要验证的数据的获取是相同的,并且两个脚本在此之前花费的时间相同。两个脚本的验证过程相同,保存到数据库的过程相同,即第 3.0 节和第 3.1 节由两个脚本共享。唯一的区别是使用多个线程进行验证。所以我在想,多线程和 SQLAlchemy 可能会使应用程序在线程模式下变慢?我是否以正确的方式实现了线程功能?在这种情况下,其中之一或线程不适合。欢迎提出建议。