我和我的同事已经设置、配置和测试 Dask 大约一周了,一切都运行良好(不能高度评价它的简单、直接和强大),但现在我们正试图利用它来进行测试,但遇到了问题。我们认为这是一个与语法和理解差距相关的相当简单的问题。非常感谢任何帮助它运行的帮助。任何支持我们加深对更优路径的理解也非常感谢。
我们与这两个帖子相当接近:
高流量:
- 在 pandas 中打开数据并清理它(我们计划将其移动到管道中)
- 从那里,将用于回归的清理数据集转换为 dask 数据框
- 设置 x & y 变量并创建所有唯一的 x 组合集
- 创建所有独特的公式(y ~ x1 + x2 +0)
- 通过线性 lassolars 模型使用数据运行每个单独的公式集,以获得每个公式的 AIC 以进行排名
目前的问题:
- 在 dask 集群上运行每个单独的公式集(~1700 个公式)和数据(1 个不随每次运行而变化的单个数据集)并返回结果
- 优化计算并返回最终数据
代码:
# In[]
# Imports:
import logging as log
import datetime as dat
from itertools import combinations
import numpy as np
import pandas as pd
from patsy import dmatrices
import sklearn as sk
from sklearn.linear_model import LogisticRegression, SGDClassifier, LinearRegression
import dask as dask
import dask.dataframe as dk
from dask.distributed import Client
# In[]
# logging, set the dask client, open & clean the data, pass into a dask dataframe
log.basicConfig(level=log.INFO,
format='%(asctime)s %(message)s',
datefmt="%m-%d %H:%M:%S"
)
c = Client('ip:port')
ST = dat.datetime.now()
data_pd = pd.read_csv('some.txt', sep="\t")
#fill some na/clean up the data a bit
data_pd['V9'] = data_pd.V9.fillna("Declined")
data_pd['y'] = data_pd.y.fillna(0)
data_pd['x1'] = data_pd.x1.fillna(0)
#output the clean data and re-import into dask, we could alse use from_pandas to get to dask dataframes
data_pd.to_csv('clean_rr_cp.csv')
data = dk.read_csv(r'C:\path\*.csv', sep=",")
# set x & y variables - the below is truncated
y_var = "y"
x_var = ['x1',
'x2',
'x3',
'x4',......
#list of all variables
all_var = list(y_var) + x_var
#all unique combinations
x_var_combos = [combos for combos in combinations(x_var,2)]
#add single variables for testing as well
for i in x_var:
x_var_combos.append((i,""))
# create formulas from our y, x variables
def formula(y_var, combo):
combo_len = len(combo)
if combo_len == 2:
formula = y_var +"~"+combo[0] +"+"+ combo[1]+"+0"
else:
formula = y_var +"~"+combo[0]+"+0"
return formula
@dask.delayed
def model_aic(dt, formula):
k = 2
y_df, x_df = dmatrices(formula, dt, return_type = 'dataframe')
y_df = np.ravel(y_df)
log.info('dmatrices successful')
LL_model = sk.linear_model.LassoLarsIC(max_iter = 100)
AIC_Value = min(LL_model.fit(x_df, y_df).criterion_) + ( (2*(k**2)+2*(k)) / (len(x_df)-k-1) )
log.info('AIC_Value: %s', AIC_Value)
oup = [formula ,AIC_Value, len(dt)-AIC_Value]
return oup
# ----------------- here's where we're stuck ---------------------
# ----------------- we think this is correct ----------------------
# ----------------- create a list of all formula to execute -------
# In[]
out = []
for i in x_var_combos:
var = model_aic(data, formula(y_var, i))
out.append(var)
# ----------------- but we're stuck figuring out how to -----------
# ------------------make it compute & return the result -----------
ans = c.compute(*out)
ans2 = c.compute(out[1])
print (ans2)