1

我和我的同事已经设置、配置和测试 Dask 大约一周了,一切都运行良好(不能高度评价它的简单、直接和强大),但现在我们正试图利用它来进行测试,但遇到了问题。我们认为这是一个与语法和理解差距相关的相当简单的问题。非常感谢任何帮助它运行的帮助。任何支持我们加深对更优路径的理解也非常感谢。

我们与这两个帖子相当接近:

高流量:

  • 在 pandas 中打开数据并清理它(我们计划将其移动到管道中)
  • 从那里,将用于回归的清理数据集转换为 dask 数据框
  • 设置 x & y 变量并创建所有唯一的 x 组合集
  • 创建所有独特的公式(y ~ x1 + x2 +0)
  • 通过线性 lassolars 模型使用数据运行每个单独的公式集,以获得每个公式的 AIC 以进行排名

目前的问题:

  • 在 dask 集群上运行每个单独的公式集(~1700 个公式)和数据(1 个不随每次运行而变化的单个数据集)并返回结果
  • 优化计算并返回最终数据

代码:

# In[]
# Imports:
import logging as log
import datetime as dat
from itertools import combinations
import numpy as np
import pandas as pd
from patsy import dmatrices

import sklearn as sk
from sklearn.linear_model import LogisticRegression, SGDClassifier, LinearRegression

import dask as dask
import dask.dataframe as dk
from dask.distributed import Client

# In[]
# logging, set the dask client, open & clean the data, pass into a dask dataframe
log.basicConfig(level=log.INFO,
                    format='%(asctime)s %(message)s',
                    datefmt="%m-%d %H:%M:%S"
                    )
c = Client('ip:port')
ST = dat.datetime.now()
data_pd = pd.read_csv('some.txt', sep="\t")
#fill some na/clean up the data a bit
data_pd['V9'] = data_pd.V9.fillna("Declined")
data_pd['y'] = data_pd.y.fillna(0)
data_pd['x1'] = data_pd.x1.fillna(0)
#output the clean data and re-import into dask, we could alse use from_pandas to get to dask dataframes 
data_pd.to_csv('clean_rr_cp.csv')
data = dk.read_csv(r'C:\path\*.csv', sep=",")

# set x & y variables - the below is truncated
y_var = "y"
x_var = ['x1',
     'x2',
     'x3',
     'x4',......


#list of all variables
all_var = list(y_var) + x_var

#all unique combinations
x_var_combos = [combos for combos in combinations(x_var,2)]
#add single variables for testing as well
for i in x_var:
    x_var_combos.append((i,""))

# create formulas from our y, x variables
def formula(y_var, combo):
    combo_len = len(combo)
    if combo_len == 2:
        formula = y_var +"~"+combo[0] +"+"+ combo[1]+"+0"
    else:
        formula = y_var +"~"+combo[0]+"+0"
    return formula

@dask.delayed
def model_aic(dt, formula):
    k = 2 
    y_df, x_df = dmatrices(formula, dt, return_type = 'dataframe')
    y_df = np.ravel(y_df)
    log.info('dmatrices successful')
    LL_model = sk.linear_model.LassoLarsIC(max_iter = 100)
    AIC_Value = min(LL_model.fit(x_df, y_df).criterion_) + ( (2*(k**2)+2*(k)) / (len(x_df)-k-1) )
    log.info('AIC_Value: %s', AIC_Value)
    oup = [formula ,AIC_Value, len(dt)-AIC_Value]
    return oup      

# ----------------- here's where we're stuck ---------------------
# ----------------- we think this is correct ----------------------
# ----------------- create a list of all formula to execute -------
# In[]
out = []
for i in x_var_combos:
    var = model_aic(data, formula(y_var, i))
    out.append(var)

# ----------------- but we're stuck figuring out how to -----------
# ------------------make it compute & return the result -----------
ans = c.compute(*out)
ans2 = c.compute(out[1])
print (ans2)
4

0 回答 0