1

我对 Dask 和 Featuretools 都很陌生,所以我在将它们结合起来以并行化特征工程时遇到了很多困难

简短版本:解决一个直接的问题我有一个 dask bagdfspandas DataFrame希望将它们输出为 csv,每个文件都以分区作为标识符。to_textfiles()引发错误,我似乎无法找到获取要使用的分区号的方法dfs.map(pd.to_csv, "[partition_num].csv")。有没有办法做到这一点?

>>> dfs 
dask.bag<map-par..., npartitions=2>

>>> type(dfs.compute()[0])
pandas.core.frame.DataFrame

>>> dfs.to_textfiles('feature_matrices/calculate_matrix/*_test')

anaconda3/envs/featuretools/lib/python3.6/site-packages/dask/utils.py in ensure_unicode()
    592         return s.decode()
    593     msg = "Object %s is neither a bytes object nor has an encode method"
--> 594     raise TypeError(msg % s)

TypeError: ('Long error message', 'Object                 Age          ArrivalMethod\nPAT_ENC_CSN_ID                            \n3223775624       33                    Car\n3223776450       82         Medical Flight\n3223776487       65                  Other\n3223776543       31              Ambulance\n3223835687       89              Ambulance\n3223838474       42  Public Transportation\n3223842283       11              Ambulance\n3223845045       60              A

长版:对于那些想知道为什么我有一大包熊猫数据框的人,我把我的整个问题放在这里寻找更好的方法。我正在尝试使用特征工具为 22k 行的数据集生成 200 万个特征(稍后用于特征选择)。我正在尝试遵循参考资料(这篇文章这个笔记本)。在 notebook 中,数据集很大(4500 万行),比我的 22k 行数据集大得多。

尽管如此,我确实将我的数据分成了 741 行的分区,因为将entity set完整数据传递给calculate_feature_matrix的顺序组件花费了太长时间(可能要分配entity set给工作人员)。即使我只用整个数据集生成一个特征,也会发生这种情况。在运行 20 分钟后,我的dask-workers( LSFCluster ) 的 CPU 利用率都没有超过 5% calculate_matrix,这导致了大量的错误跟踪:

使用具有一个特征的整个数据集:

...
  File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/netutil.py", line 249, in accept_handler
  File "/path/anaconda3/envs/featuretools/lib/python3.6/socket.py", line 205, in accept
OSError: [Errno 24] Too many open files
Exception in callback BaseAsyncIOLoop._handle_events(110, 1)
handle: <Handle BaseAsyncIOLoop._handle_events(110, 1)>
Traceback (most recent call last):
  File "/path/anaconda3/envs/featuretools/lib/python3.6/asyncio/events.py", line 145, in _run
    self._callback(*self._args)
  File "/lab/corradin_data/FOR_AN/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/platform/asyncio.py", line 122, in _handle_events
    handler_func(fileobj, events)
  File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/netutil.py", line 249, in accept_handler
  File "path/anaconda3/envs/featuretools/lib/python3.6/socket.py", line 205, in accept
OSError: [Errno 24] Too many open files

除了拆分数据集,我还按特征拆分,一次做一个特征。我现在想将该功能写入磁盘,但想将它们组合成 1k 块而不是输出 2 mil csv 文件。以下是我到目前为止的方法,最终以 dfsdask bagpandas DataFrame

对于 741 行的每个分区,一次计算一个特征

from dask_jobqueue import LSFCluster
from dask.distributed import Client
cluster = LSFCluster(...)
client = Client(cluster)

# take a feature, return a feature matrix for a subset of data
def make_feature(feature):
    feature_name = feature.generate_name()
    try:
        feature_matrix = ft.calculate_feature_matrix(feature, entityset=es, n_jobs= 1, verbose = 1) #es is one partition of dataset

        print(f"Finished generating feature {feature_name}")
        return feature_matrix
    except:
        print(f"Could not make feature: {feature_name}")
        print("--------")
        return None

import dask.bag as db
b = db.from_sequence(feature_list, partition_size=1000) # 1k feature per partition
b = b.map(make_feature) 

#concatenate 1k dataframe (1 partition) to 1 df 
def concat(partition):
    series = [i for i in partition]
    df = pd.concat(series,axis =1)
    return [df]

dfs = b.map_partitions(concat) # dask bag of dataframes

overall_start = timer()
dfs.compute()
overall_end = timer()

print(f"Total Time Elapsed: {round(overall_end - overall_start, 2)} seconds.")

#ouput to disk here
???

这是我的第一个 SO 问题,所以请让我知道要修复/添加什么以使我的问题更清楚。谢谢!

4

0 回答 0