我对 Dask 和 Featuretools 都很陌生,所以我在将它们结合起来以并行化特征工程时遇到了很多困难
简短版本:解决一个直接的问题我有一个 dask bagdfs
并pandas DataFrame
希望将它们输出为 csv,每个文件都以分区作为标识符。to_textfiles()引发错误,我似乎无法找到获取要使用的分区号的方法dfs.map(pd.to_csv, "[partition_num].csv")
。有没有办法做到这一点?
>>> dfs
dask.bag<map-par..., npartitions=2>
>>> type(dfs.compute()[0])
pandas.core.frame.DataFrame
>>> dfs.to_textfiles('feature_matrices/calculate_matrix/*_test')
anaconda3/envs/featuretools/lib/python3.6/site-packages/dask/utils.py in ensure_unicode()
592 return s.decode()
593 msg = "Object %s is neither a bytes object nor has an encode method"
--> 594 raise TypeError(msg % s)
TypeError: ('Long error message', 'Object Age ArrivalMethod\nPAT_ENC_CSN_ID \n3223775624 33 Car\n3223776450 82 Medical Flight\n3223776487 65 Other\n3223776543 31 Ambulance\n3223835687 89 Ambulance\n3223838474 42 Public Transportation\n3223842283 11 Ambulance\n3223845045 60 A
长版:对于那些想知道为什么我有一大包熊猫数据框的人,我把我的整个问题放在这里寻找更好的方法。我正在尝试使用特征工具为 22k 行的数据集生成 200 万个特征(稍后用于特征选择)。我正在尝试遵循参考资料(这篇文章和这个笔记本)。在 notebook 中,数据集很大(4500 万行),比我的 22k 行数据集大得多。
尽管如此,我确实将我的数据分成了 741 行的分区,因为将entity set
完整数据传递给calculate_feature_matrix的顺序组件花费了太长时间(可能要分配entity set
给工作人员)。即使我只用整个数据集生成一个特征,也会发生这种情况。在运行 20 分钟后,我的dask-workers
( LSFCluster ) 的 CPU 利用率都没有超过 5% calculate_matrix
,这导致了大量的错误跟踪:
使用具有一个特征的整个数据集:
...
File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
return fn(*args, **kwargs)
File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/netutil.py", line 249, in accept_handler
File "/path/anaconda3/envs/featuretools/lib/python3.6/socket.py", line 205, in accept
OSError: [Errno 24] Too many open files
Exception in callback BaseAsyncIOLoop._handle_events(110, 1)
handle: <Handle BaseAsyncIOLoop._handle_events(110, 1)>
Traceback (most recent call last):
File "/path/anaconda3/envs/featuretools/lib/python3.6/asyncio/events.py", line 145, in _run
self._callback(*self._args)
File "/lab/corradin_data/FOR_AN/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/platform/asyncio.py", line 122, in _handle_events
handler_func(fileobj, events)
File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
return fn(*args, **kwargs)
File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/netutil.py", line 249, in accept_handler
File "path/anaconda3/envs/featuretools/lib/python3.6/socket.py", line 205, in accept
OSError: [Errno 24] Too many open files
除了拆分数据集,我还按特征拆分,一次做一个特征。我现在想将该功能写入磁盘,但想将它们组合成 1k 块而不是输出 2 mil csv 文件。以下是我到目前为止的方法,最终以 dfsdask bag
为pandas DataFrame
对于 741 行的每个分区,一次计算一个特征:
from dask_jobqueue import LSFCluster
from dask.distributed import Client
cluster = LSFCluster(...)
client = Client(cluster)
# take a feature, return a feature matrix for a subset of data
def make_feature(feature):
feature_name = feature.generate_name()
try:
feature_matrix = ft.calculate_feature_matrix(feature, entityset=es, n_jobs= 1, verbose = 1) #es is one partition of dataset
print(f"Finished generating feature {feature_name}")
return feature_matrix
except:
print(f"Could not make feature: {feature_name}")
print("--------")
return None
import dask.bag as db
b = db.from_sequence(feature_list, partition_size=1000) # 1k feature per partition
b = b.map(make_feature)
#concatenate 1k dataframe (1 partition) to 1 df
def concat(partition):
series = [i for i in partition]
df = pd.concat(series,axis =1)
return [df]
dfs = b.map_partitions(concat) # dask bag of dataframes
overall_start = timer()
dfs.compute()
overall_end = timer()
print(f"Total Time Elapsed: {round(overall_end - overall_start, 2)} seconds.")
#ouput to disk here
???
这是我的第一个 SO 问题,所以请让我知道要修复/添加什么以使我的问题更清楚。谢谢!