0

我有一个 python 脚本,它验证从数据库中的某些行获取的数据,然后将错误记录在同一数据库的不同表中。该脚本验证每一行并将其标记为已验证,并且错误 = True/False,具体取决于验证结果。对每一行重复此过程。有了这个,我想我会通过创建线程来添加一些类固醇,这样每一行的验证都由独立的线程完成,从而减少了验证一批行所需的时间。

令我惊讶的是,我发现线程脚本比非线程脚本花费的时间稍长。平均而言,验证 1502 行数据需要非线程脚本 1.5 秒,而线程脚本需要 2.27 秒。这可能不多,但理想情况下,我将一次运行 200 万条记录,这样时间开销就会很大。再加上我会假设线程应用程序会更快地完成!:-)

这两个脚本的时钟时间相同,大约为 0.01 秒,直到创建线程为止。至此,SQLAlchemy 会话已创建,所有要验证的数据和关系(即外键等)都已获取。不过,从那里开始,非线程脚本完成得更快。下面是我的代码。

1.0 无线程脚本

#Alot of code goes above this to fetch the data  that is passed on to the validator function
#However, the two scripts are the same upto this point in regards to time taken so didn't see need to post them.
for lf_detail_id in load_file_detail_id:
    params = lf_detail_id, load_file_id, entry_number[lf_detail_counter], \
           data[lf_detail_counter], template_version[lf_counter], \
           load_file_detail, error, dt_file, dt_columns 
    data_list.append(params)
    lf_detail_counter += 1
    no_of_records += 1

validator = Validate()
validator.validator(no_of_records, data_list)
record_counter += lf_detail_counter
data_list = None
no_of_records = 0
print("Validated '%s': seconds %s" %(filename[lf_counter], time.time()-file_start_time))     #print time it took to run'

#Mark the load file as validated
is_done = load_file.set_validation(load_file_id, True)
if is_done == False:
    raise Exception ("Can't update load_file's is_validated parameter: ", lf_detail_id)

#Reset counters
lf_detail_counter = 0
lf_counter += 1

#Commit The Entire Transaction.
session.commit()
print("NoThread:Finished validating %s file(s) with %s record(s) in %s seconds\n" %(lf_counter, record_counter, time.time()- process_start_time))

1.1。非线程脚本的验证函数

class Validate():
    has_error = None
    def validator(self, loop_length, job):                
        '''Validate data'''
        for row_counter in range(loop_length):
            load_file_detail_id, load_file_id, entry_number, data, \
            template_version, load_file_detail, error, dt_file, dt_columns = job[row_counter]
            error_detail = ErrorLogDetail()
            if data.strip() == "":
                error_detail.errorlog = error
                error_detail.load_file_detail_id = load_file_detail_id
                error_detail.pos_row = entry_number
                error_detail.pos_col = None
                error_detail.value_provided = None
                error_detail.column_name = None
                error_detail.value_provided = None
                error_detail.description = "error message 1"
                session.add(error_detail)
                error_detail = ErrorLogDetail()
                self.has_error = True
                self.set_validation(load_file_detail, load_file_detail_id, True, False)
                continue
            elif len(data) != int(dt_file.data_length):
                error_detail.errorlog = error
                error_detail.load_file_detail_id = load_file_detail_id = load_file_detail_id
                error_detail.pos_row = entry_number   
                error_detail.pos_col = None
                error_detail.column_name = None
                error_detail.value_provided = None
                error_detail.description = "error message 2"
                session.add(error_detail)
                error_detail = ErrorLogDetail()
                self.has_error = True
                self.set_validation(load_file_detail, load_file_detail_id, True, False)  
                continue
            else:
                #Continue with extra validation

            #If record passes all validation then mark mark it as haserror = False
            if self.has_error == False:
                self.set_validation(load_file_detail, load_file_detail_id, False, True)
            else:
                self.has_error = False
            jobs.task_done()    #For the script with threading the job is marked as done. Else this does not appear in the non-threaded script  

2.0 线程脚本

#Alot of code goes above this to fetch the data  that is passed on to the validator function
#However, the two scripts are the same upto this point in regards to time taken so didn't see need to post them.
for lf_detail_id in load_file_detail_id:
    params = lf_detail_id, load_file_id, entry_number[lf_detail_counter], \
           data[lf_detail_counter], template_version[lf_counter], \
           load_file_detail, error, dt_file, dt_columns 
    data_list.append(params)
    lf_detail_counter += 1
    queue_size += 1
    if queue_size == THREAD_LIMIT:
        myqueuing(queue_size, data_list)
        queue_size = 0

#spawn a pool of threads, and pass them queue instance 
if queue_size > 0:
    myqueuing(queue_size, data_list)

#Keep record of rows processed
record_counter += lf_detail_counter 
print("Validated '%s': seconds- %s " %(filename[lf_counter], time.time()-file_start_time))     #print time it took to run'

#Mark the load file as validated
is_done = load_file.set_validation(load_file_id, True)
if is_done == False:
    raise Exception ("Can't update load_file's is_validated parameter: ", lf_detail_id)

#Commit The Entire Transaction.
session.commit()
#Reset counters
lf_detail_counter = 0
lf_counter += 1
data_list = None
queue_size = 0              
print("HasThread:Finished loading %s file(s) with %s record(s) in %s seconds\n" %(lf_counter, record_counter, time.time()-process_start_time))     #print time it took to run'

2.1。线程验证函数

THREAD_LIMIT = 50                # This is how many threads we want
jobs = queue.Queue()           # This sets up the queue object to use 5 slots
singlelock = threading.Lock()   # This is a lock so threads don't print trough each other (and other reasons)

def myqueuing(queuesize, data):
    '''Put the fetched data in a queue and instantiate threads to
    process the queue'''
    # Spawn the threads
    is_valid_date("20131212", True) #Calling this here to avoid a bug in time.striptime() when threading
    for x in range(queuesize):
        # This is the thread class that we instantiate.
        workerbee().start()

    # Put stuff in queue
    for i in range(queuesize):
        # Block if queue is full, and wait 2 seconds. After 5s raise Queue Full error.
        try:
            jobs.put(data[i], block=True, timeout=2)
        except:
            singlelock.acquire()
            print ("The queue is full !")
            singlelock.lock.release()       

    # Wait for the threads to finish
    singlelock.acquire()        # Acquire the lock so we can print
    print ("Waiting for threads to finish.")
    singlelock.release()       # Release the lock
    jobs.join()                 # This command waits for all threads to finish.             


class workerbee(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)
        self.lock = threading.Lock()
        self.has_error = False

    def run(self):
        #try:
        job = jobs.get(True,1)
        load_file_detail_id, load_file_id, entry_number, data, \
        template_version, load_file_detail, error, dt_file, dt_columns = job                
        '''Validates the data.'''
        error_detail = ErrorLogDetail()
        #Again please note that this part is identical for both the non-threaded and the threaded script. 
        #After each pass on a record, the record is marked as validated and if has_error = True
        if data.strip() == "":
            error_detail.errorlog = error
            error_detail.load_file_detail_id = load_file_detail_id
            error_detail.pos_row = entry_number
            error_detail.pos_col = None
            error_detail.value_provided = None
            error_detail.column_name = None
            error_detail.value_provided = None
            error_detail.description = "erro message1"
            session.add(error_detail)
            error_detail = ErrorLogDetail()
            self.has_error = True
            self.set_validation(load_file_detail, load_file_detail_id, True, True)     
        elif len(data) != int(dt_file.data_length):
            error_detail.errorlog = error
            error_detail.load_file_detail_id = load_file_detail_id = load_file_detail_id
            error_detail.pos_row = entry_number   
            error_detail.pos_col = None
            error_detail.column_name = None
            error_detail.value_provided = None
            error_detail.description = "erro message2")
            session.add(error_detail)
            error_detail = ErrorLogDetail()
            self.has_error = True
            self.set_validation(load_file_detail, load_file_detail_id, True, True)    
        else:
            #Continue with further validation - about 5 other validation checks

        #If record passes all validation then mark mark it as haserror = False
        if self.has_error == False:
            self.set_validation(load_file_detail, load_file_detail_id, False, True)
        else:
            self.has_error = False
        jobs.task_done()    #For the script with threading the job is marked as done. Else this does not appear in the non-threaded script

3.0。在线程和非线程中设置验证的通用函数

def set_validation(self, load_file_detail, load_file_detail_id, has_error, can_be_loaded):
    '''Mark the record as having been validated and whether has error = True or False'''
    #print("haserror and canbeloaded ", has_error, can_be_loaded)
    is_done = load_file_detail.set_validation_and_error(load_file_detail_id, True, has_error, can_be_loaded)
    if is_done == False:
        raise Exception ("Can't update load_file_detail's is_validated parameter: ", load_file_detail_id)                   

3.1。用于保存验证状态的实际 SQLAlchemy 会话

def set_validation_and_error(self, load_file_detail_id, is_validated, has_error, can_be_loaded):
    result = session.execute('UPDATE load_file_detail SET is_validated=%s, has_error=%s, can_be_loaded=%s WHERE id=%s' \
                    %(is_validated, has_error, can_be_loaded, load_file_detail_id))

因此,要验证的数据的获取是相同的,并且两个脚本在此之前花费的时间相同。两个脚本的验证过程相同,保存到数据库的过程相同,即第 3.0 节和第 3.1 节由两个脚本共享。唯一的区别是使用多个线程进行验证。所以我在想,多线程和 SQLAlchemy 可能会使应用程序在线程模式下变慢?我是否以正确的方式实现了线程功能?在这种情况下,其中之一或线程不适合。欢迎提出建议。

4

1 回答 1

2

您必须为记录创建队列并添加“记录器”线程。所以你删除锁的代码必须更快。

还要在每个线程中创建数据库连接,以便能够并行获取数据。

由于 GIL,Treads 仅并行化 C 库调用。

对于并行化 python 代码,您必须使用多处理。

我为你写测试,描述如何处理可迭代:

def produce_data(amount=100000, invalid=1, valid=10): 
# produce_data = sql('request').getall()
    import random
    id = 0
    data = [True]*valid + [False]*invalid
    while id < amount:
        id+=1
        yield (id,random.choice(data))


def validate(row):
    if row[1]:
        time.sleep(0.001) #set valid sql request emulation.
        return True
    else:
        time.sleep(0.001) #set invalid sql request emulation.
        return False



def single():
    for row in produce_data():
        validate(row)

def targeted():
    import threading
    for row in produce_data():
        threading.Thread(target=validate,args=(row,))

Uley = 50

class Bee(object):
        error=False
        running = True
        def __init__(self,queue,*args,**kwargs):
            self.queue=queue #dont use any global variable!
            # every bee must have unique db connection and session.
            #self.session = db.connection().session()
            # initialize it there.
            return super(Bee,self).__init__(*args,**kwargs)

        def run(self):
            while self.running:
                data=self.queue.get()
                if data: 
                    self.error = validate(data) # refactor it to self.validate(data) to be able to get cursor from self.session.
                    self.queue.task_done()
                else:
                    self.queue.task_done()
                    break

            #self.session.commit()                  


def treaded():
    import threading,Queue

    class TreadedBee(Bee,threading.Thread): pass

    q = Queue.Queue()

    for i in range(Uley): #bees started before data was provided.
        bee=TreadedBee(q)
        bee.daemon = True
        bee.start()

    for row in produce_data(): #you dont need to get all data to begin processing, at this place must be cursor of response.
        q.put(row)

    q.join()
    for i in range(Uley):
        q.put(None)


def forked():
    from multiprocessing import Process,JoinableQueue
    class ForkedBee(Bee,Process): pass

    q = JoinableQueue()
    for i in range(Uley):
        bee=ForkedBee(q)
        bee.start()

    for row in produce_data():
        q.put(row)

    q.join()
    #at this you need to kill zomBee -)
    for i in range(Uley):
        q.put(None)
    q.close()

def pool():
    from multiprocessing import Pool
    pool = Pool(processes=Uley)
    pool.map(validate,produce_data())

if __name__ == "__main__":
    import time
    s=time.time()
    single() 
    print(time.time()-s) #109
    s=time.time()
    single() 
    print(time.time()-s) #6
    s=time.time()
    treaded()
    print(time.time()-s) #12
    s=time.time()
    forked()
    print(time.time()-s) #6
    s=time.time()
    pool() 
    print(time.time()-s) #4

测试结果:

$ python2 tgreads.py 
109.779700994
5.84457302094
12.3814198971
5.97618508339
3.69856286049

targeted将淹没 CPU、内存,并且您无法提供与 DB 的单独连接,使用共享连接是不安全的。如果想这样走 - 你需要提供输出队列并实现收集器,它将与数据库通信。pool是短代码和最快的,但对启动每个工作人员的连接不友好。

于 2013-07-31T06:56:27.727 回答