我有一个脚本,我需要通过查询数据库来创建一个大型数据框。我创建了一个类,它将查询划分为单独的进程,在 where 子句中划分主键并将成批的记录推送到队列中。完成这项工作的辅助函数如下。
def query_generator(conn, query, batch_size, reduce_info=None):
"""Creates a generator for a query which yields fetched batches
from a query
inputs:
conn -- database connection
query -- query to run
batch_size -- size of batches to pull
yields:
list of dictionaries consisting of the records"""
print(query)
conn.execute('DECLARE c SCROLL CURSOR FOR ' + query)
print('Cursor Created')
vals = True
need_columns = reduce_info is not None
while vals:
res = conn.execute('FETCH {} FROM c'.format(batch_size))
vals = res.fetchall()
if need_columns:
reduce_info['columns'] = res.keys()
need_columns = False
yield vals
conn.execute('Close c')
def query_to_queue(query_tuple, batch_size, fetch_queue, reduce_info=None):
"""Yields batchees from the query generator and puts them in
the multiprocessing queue until the full queries ahve run
inputs:
query_tuple -- tuple consisting of a connection object and a query
batch_size -- the size of the batches which should be fetched
fetch_queue -- multiprocessing queue to put the batches into"""
conn, query = query_tuple
qg = query_generator(conn, query, batch_size, reduce_info)
while True:
try:
fetch_queue.put(next(qg))
except StopIteration:
break
def reduce_from_queue(fetch_queue, reduce_func, combine_func, reducer,status_dict, reduce_info={}, reduce_list=None, timeout=1):
"""Pulls batches out of the multiprocessing queue, to create 1 list of
dictioanries with all of the records, then create a dataframe of all
of the records.
inputs:
fetch_queue -- multiprocessing queue where workers are putting fetched records
reduce_func -- Function for processing each batch yielded from DB
combine_func -- Function for combining each batch to the current data
reducer -- process number for reducer for recording the status
status_dict -- dictionary recording the status of each reducer
reduce_info -- kwargs needed for reduce_func
reduce_list -- list to append the results of combine to (if none return results)
timeout -- timeout for returning empty from fetch_queue
outputs:
comb_data -- only if reduce_list is None"""
loop = True
has_returned = False
first_val = True
while loop:
try:
val = fetch_queue.get(timeout=timeout)
if (type(val) == str) and (val == 'STOP'):
loop = False
break
while 'columns' not in reduce_info:
print('waiting')
time.sleep(2)
new_data = reduce_func(val, **reduce_info)
has_returned = True
if (first_val and new_data is not None):
comb_data = new_data
first_val = False
elif (new_data is not None):
comb_data = combine_func(new_data, comb_data)
except Empty:
if has_returned:
loop = False
except Exception as e:
print(e)
loop = False
raise(e)
if reduce_list is not None:
reduce_list.append(comb_data)
status_dict[reducer] = True
else:
status_dict[reducer] = True
return comb_data
# Note Query Parallel is from a class, but only uses class attributes to get a connection pool. Trying to limit this to relevant code
def query_parallel(self, query, filter_list, reduce_func=None, combine_func= None, final_combine_func=None, n_reducers=1, batch_size=2000, reduce_timeout=1000):
"""Run a query in parallel from multiple connections each querying
a different partition or part of the primary key.
inputs:
query -- Query to run with one formatting field at the beginning of the
filters field for insertng the filters
filter_list -- list of filterswhich call different partitions or divide up
the primary key
batch_size -- size of each batch returned from the database
outputs:
df -- dataframe with the results of the query"""
print('Starting Query Parallel')
conn_list = self.dial_pool(len(filter_list))
try:
query_list = [query.format(f) for f in filter_list]
fetch_queue = MpQueue()
fetch_process_list = list()
manager = Manager()
reduce_list = manager.list()
status_dict = manager.dict()
reduce_info = manager.dict()
process_dict = dict()
for i, conn in enumerate(conn_list):
query_tuple = (conn, query_list[i])
arg_dict = {'query_tuple': query_tuple,
'batch_size': batch_size,
'fetch_queue': fetch_queue}
if i == 0:
arg_dict['reduce_info'] = reduce_info
p = Process(target=query_to_queue,
kwargs=arg_dict)
p.start()
fetch_process_list.append(p)
print('NReducers: ',n_reducers)
print(n_reducers)
for i in range(n_reducers):
print('Reducer Started')
status_dict[i] = False
p = Process(target=reduce_from_queue,
kwargs={'fetch_queue': fetch_queue,
'reduce_func': reduce_func,
'status_dict': status_dict,
'combine_func': combine_func,
'reduce_list': reduce_list,
'reduce_info': reduce_info,
'reducer': i})
p.start()
process_dict[i] = p
for t in fetch_process_list:
t.join()
print('Fetch Process Joined')
print('All Fetch Joined')
for i in range(n_reducers):
fetch_queue.put('STOP')
print('Stop Message Sent')
for pid, p in process_dict.items():
for i in range(reduce_timeout // 2):
if not p.is_alive():
break
if status_dict[pid]:
p.terminate()
break
time.sleep(2)
p.join(timeout=10)
print('starting final reduce')
def null_func(res):
return res
res = combine_from_list(fetch_list=reduce_list,
combine_func=final_combine_func,
reduce_queue=None)
except Exception as e:
print(e)
traceback.print_exc()
finally:
for c in conn_list:
c.close()
return res
# Input functions used for creating a dataframe, could be changed to do inline analysis or other reduction
def records_to_df_list(records, columns):
return([pd.DataFrame(records, columns=columns)])
def combine_df_list(comb_data, new_data):
return(comb_data + new_data)
def concat_df_lists(comb_list):
t = time.time()
df_list = list()
for sublist in comb_list:
df_list += sublist
print(t - time.time())
return pd.concat(df_list)
我当前的实现结合了我的 fetch_queue 中的所有批处理列表,并在查询完所有数据后将它们转换为 pandas 数据框。但是,这通常需要与从数据库中提取所有数据一样长或更长的时间。
我最初的想法是有一个“reducer process”,它在获取过程中从 fetch_queue 创建更小的数据帧,并将它们附加到一个列表中,并在获取所有数据后与 pandas concat 结合。但是,由于 pd.concat 中的指数复制,这比等待结束为记录制作数据帧要慢。(如果有必要,我可以选择减少批次)
似乎我缺少一些效率,以便能够在加载所有数据之前开始处理位于内存中的数据的数据帧。
更新
跑步
def query_pool(self, query, filter_list):
"""Run a query in parallel from multiple connections each querying
a different partition or part of the primary key.
inputs:
query -- Query to run with one formatting field at the beginning of the
filters field for insertng the filters
filter_list -- list of filters which call different partitions or divide up
the primary key
outputs:
df -- dataframe with the results of the query"""
try:
query_list = [query.format(f) for f in filter_list]
p = Pool(len(query_list))
partial_get_df = partial(get_df, connection_string = self.con_str)
df_list = p.map(partial_get_df, query_list)
except Exception as e:
print(e)
traceback.print_exc()
finally:
p.close()
return pd.concat(df_list)
def get_df(query, connection_string):
e = sa.create_engine(connection_string)
conn = e.connect()
df = pd.read_sql(con=conn, sql=query)
conn.close()
return df
运行时间为 13 秒,与将数据拉入并行进程所需的时间大致相同,这使得 pandas 处理可以忽略不计。我不确定 Pool.map() 的实现如何将数据传递到主进程中,但与尝试使用队列相比,它的开销似乎要少得多。