2

我有一个脚本,我需要通过查询数据库来创建一个大型数据框。我创建了一个类,它将查询划分为单独的进程,在 where 子句中划分主键并将成批的记录推送到队列中。完成这项工作的辅助函数如下。

def query_generator(conn, query, batch_size, reduce_info=None):
    """Creates a generator for a query which yields fetched batches
       from a query

       inputs:
       conn --  database connection
       query -- query to run
       batch_size -- size of batches to pull

       yields:
       list of dictionaries consisting of the records"""
    print(query)
    conn.execute('DECLARE c SCROLL CURSOR FOR ' + query)
    print('Cursor Created')
    vals = True
    need_columns = reduce_info is not None
    while vals:
        res = conn.execute('FETCH {} FROM c'.format(batch_size))
        vals = res.fetchall()
        if need_columns:
            reduce_info['columns'] = res.keys()
            need_columns = False
        yield vals
    conn.execute('Close c')


def query_to_queue(query_tuple, batch_size, fetch_queue, reduce_info=None):
    """Yields batchees from the query generator and puts them in
       the multiprocessing queue until the full queries ahve run

       inputs:
       query_tuple -- tuple consisting of a connection object and a query
       batch_size -- the size of the batches which should be fetched
       fetch_queue -- multiprocessing queue to put the batches into"""
    conn, query = query_tuple
    qg = query_generator(conn, query, batch_size, reduce_info)
    while True:
        try:
            fetch_queue.put(next(qg))
        except StopIteration:
            break

def reduce_from_queue(fetch_queue, reduce_func, combine_func, reducer,status_dict, reduce_info={}, reduce_list=None, timeout=1):
    """Pulls batches out of the multiprocessing queue, to create 1 list of
       dictioanries with all of the records, then create a dataframe of all
       of the records.

       inputs:
       fetch_queue -- multiprocessing queue where workers are putting fetched records
       reduce_func -- Function for processing each batch yielded from DB
       combine_func -- Function for combining each batch to the current data
       reducer -- process number for reducer for recording the status
       status_dict -- dictionary recording the status of each reducer
       reduce_info -- kwargs needed for reduce_func
       reduce_list -- list to append the results of combine to (if none return results)
       timeout -- timeout for returning empty from fetch_queue

       outputs:
       comb_data -- only if reduce_list is None"""
    
    loop = True
    has_returned = False
    first_val = True
    while loop:
        try:
            val = fetch_queue.get(timeout=timeout)        
            if (type(val) == str) and (val == 'STOP'):
                loop = False
                break
            while 'columns' not in reduce_info:
                print('waiting')
                time.sleep(2)
            new_data = reduce_func(val, **reduce_info)
            has_returned = True
            if (first_val and new_data is not None):
                comb_data = new_data
                first_val = False
            elif (new_data is not None):
                comb_data = combine_func(new_data, comb_data)

        except Empty:
            if has_returned:
                loop = False
        except Exception as e:
            print(e)
            loop = False
            raise(e)
    if reduce_list is not None:
        reduce_list.append(comb_data)
        status_dict[reducer] = True
    else:
        status_dict[reducer] = True
        return comb_data

# Note Query Parallel is from a class, but only uses class attributes to get a connection pool.  Trying to limit this to relevant code
   def query_parallel(self, query, filter_list,  reduce_func=None, combine_func= None, final_combine_func=None, n_reducers=1, batch_size=2000, reduce_timeout=1000):
        """Run a query in parallel from multiple connections each querying
           a different partition or part of the primary key.

           inputs:
           query -- Query to run with one formatting field at the beginning of the
                  filters field for insertng the filters
           filter_list -- list of filterswhich call different partitions or divide up
                        the primary key
           batch_size --  size of each batch returned from the database

           outputs:
           df -- dataframe with the results of the query"""
        print('Starting Query Parallel')
        conn_list = self.dial_pool(len(filter_list))
        try:
            query_list = [query.format(f) for f in filter_list]
            fetch_queue = MpQueue()

            fetch_process_list = list()
            manager = Manager()
            reduce_list = manager.list()
            status_dict = manager.dict()
            reduce_info = manager.dict()
            process_dict = dict()
            for i, conn in enumerate(conn_list):
                query_tuple = (conn, query_list[i])
                arg_dict = {'query_tuple': query_tuple,
                            'batch_size': batch_size,
                            'fetch_queue': fetch_queue}
                if i == 0:
                    arg_dict['reduce_info'] = reduce_info
                p = Process(target=query_to_queue,
                            kwargs=arg_dict)
                p.start()
                fetch_process_list.append(p)
          
                    
            print('NReducers: ',n_reducers)
            print(n_reducers)
            for i in range(n_reducers):
                print('Reducer Started')
                status_dict[i] = False
                p = Process(target=reduce_from_queue,
                            kwargs={'fetch_queue': fetch_queue,
                                    'reduce_func': reduce_func,
                                    'status_dict': status_dict,
                                    'combine_func': combine_func,
                                    'reduce_list': reduce_list,
                                    'reduce_info': reduce_info,
                                    'reducer': i})
                
                p.start()
                process_dict[i] = p
            for t in fetch_process_list:
                t.join()
                print('Fetch Process Joined')
            print('All Fetch Joined')
            for i in range(n_reducers):
                fetch_queue.put('STOP')
            print('Stop Message Sent')
            for pid, p in process_dict.items():
                for i in range(reduce_timeout // 2):
                    if not p.is_alive():
                        break
                    if status_dict[pid]:
                        p.terminate()
                        break
                    time.sleep(2)
                    
                p.join(timeout=10)
            print('starting final reduce')
            def null_func(res):
                return res
            res = combine_from_list(fetch_list=reduce_list,
                                    combine_func=final_combine_func,
                                    reduce_queue=None)
        except Exception as e:
            print(e)
            traceback.print_exc()
        finally:
            for c in conn_list:
                c.close()
        return res


# Input functions used for creating a dataframe, could be changed to do inline analysis or other reduction
def records_to_df_list(records, columns):
    return([pd.DataFrame(records, columns=columns)])

def combine_df_list(comb_data, new_data):
    return(comb_data + new_data)

def concat_df_lists(comb_list):
    t = time.time()
    df_list = list()
    for sublist in comb_list:
        df_list += sublist
    print(t - time.time())
    return pd.concat(df_list)  

我当前的实现结合了我的 fetch_queue 中的所有批处理列表,并在查询完所有数据后将它们转换为 pandas 数据框。但是,这通常需要与从数据库中提取所有数据一样长或更长的时间。

我最初的想法是有一个“reducer process”,它在获取过程中从 fetch_queue 创建更小的数据帧,并将它们附加到一个列表中,并在获取所有数据后与 pandas concat 结合。但是,由于 pd.concat 中的指数复制,这比等待结束为记录制作数据帧要慢。(如果有必要,我可以选择减少批次)

似乎我缺少一些效率,以便能够在加载所有数据之前开始处理位于内存中的数据的数据帧。

更新

跑步

    def query_pool(self, query, filter_list):
        """Run a query in parallel from multiple connections each querying
           a different partition or part of the primary key.

           inputs:
           query -- Query to run with one formatting field at the beginning of the
                  filters field for insertng the filters
           filter_list -- list of filters which call different partitions or divide up
                        the primary key

           outputs:
           df -- dataframe with the results of the query"""
        try:
            query_list = [query.format(f) for f in filter_list]
            p = Pool(len(query_list))
            partial_get_df = partial(get_df, connection_string = self.con_str)
            df_list = p.map(partial_get_df, query_list)
        except Exception as e:
            print(e)
            traceback.print_exc()
        finally:
            p.close()
        return pd.concat(df_list)

def get_df(query, connection_string):
    e = sa.create_engine(connection_string)
    conn = e.connect()
    df = pd.read_sql(con=conn, sql=query)
    conn.close()
    return df

运行时间为 13 秒,与将数据拉入并行进程所需的时间大致相同,这使得 pandas 处理可以忽略不计。我不确定 Pool.map() 的实现如何将数据传递到主进程中,但与尝试使用队列相比,它的开销似乎要少得多。

4

0 回答 0