0

我有几个parquet文件(数据框),我将它们作为一个 dask 数据框图和示例加载。之后,我根据数据框中的原始数据执行一些计算,并将新列附加到我的 dask 数据框中。

最后,我想计算所有列的mean()andstd()并得到一个ValueError我不确定它来自哪里或我做错了什么。

import pandas as pd
import numpy as np
import tensorflow as tf
import os
from os.path import join
import dask
import dask.dataframe as dd
import dask.array as da


# read in the data
data_pq = dd.read_parquet(join(path_to_data,'filter_width_*_DNN_train.parquet'),chunksize='4GB') 

print('Convert to single precission and sample')
data_pq = data_pq.astype(np.float32).sample(frac=0.1)

# ## compute the additional quantites (tensors)

# compute tensors R, S mag(U) etc.

mag_U = da.sqrt(data_pq['U_bar'].values**2 + data_pq['V_bar'].values**2 +data_pq['W_bar'].values**2)
mag_grad_c = da.sqrt(data_pq['grad_c_x_LES'].values**2 + data_pq['grad_c_y_LES'].values**2 +data_pq['grad_c_z_LES'].values**2)

sum_U = data_pq['U_bar'].values + data_pq['V_bar']+data_pq['W_bar'].values
sum_c = da.absolute(data_pq['grad_c_x_LES'].values) + da.absolute(data_pq['grad_c_y_LES'].values) +da.absolute(data_pq['grad_c_z_LES'].values)

grad_U = da.sqrt(data_pq['grad_U_x_LES'].values**2 + data_pq['grad_U_y_LES'].values**2 +data_pq['grad_U_z_LES'].values**2)
grad_V = da.sqrt(data_pq['grad_V_x_LES'].values**2 + data_pq['grad_V_y_LES'].values**2 +data_pq['grad_V_z_LES'].values**2)
grad_W = da.sqrt(data_pq['grad_W_x_LES'].values**2 + data_pq['grad_W_y_LES'].values**2 +data_pq['grad_W_z_LES'].values**2)

mag_grad_U = da.sqrt(grad_U**2 + grad_V**2 +grad_W**2)
sum_grad_U = da.absolute(grad_U) + da.absolute(grad_V) +da.absolute(grad_W)

print('Computing gradient_tensor')

gradient_tensor = da.array([
                    [data_pq['grad_U_x_LES'],data_pq['grad_V_x_LES'],data_pq['grad_W_x_LES']],
                    [data_pq['grad_U_y_LES'],data_pq['grad_V_y_LES'],data_pq['grad_W_y_LES']],
                    [data_pq['grad_U_z_LES'],data_pq['grad_V_z_LES'],data_pq['grad_W_z_LES']]
                    ])


print('Computing S and R')
# symetric strain
Strain = 0.5*(gradient_tensor + da.transpose(gradient_tensor,(1,0,2)))
#anti symetric strain
Anti =  0.5*(gradient_tensor - da.transpose(gradient_tensor,(1,0,2)))


print('Computing lambdas')

lambda_1 = da.trace(Strain**2)
lambda_2 = da.trace(Anti**2)
lambda_3 = da.trace(Strain**3)
lambda_4 = da.trace(Anti**2 * Strain)
lambda_5 = da.trace(Anti**2 * Strain**2)


# Add to the dask dataframe

data_pq['mag_grad_c'] = mag_grad_c
data_pq['mag_U'] = mag_U
data_pq['sum_c'] = sum_c
data_pq['sum_U'] = sum_U
data_pq['sum_grad_U'] = sum_grad_U
data_pq['mag_grad_U'] = mag_grad_U

# REPARTITON
data_pq = data_pq.repartition(npartitions=lambda_1.npartitions)

data_pq['lambda_1'] = lambda_1
data_pq['lambda_2'] = lambda_2
data_pq['lambda_3'] = lambda_3
data_pq['lambda_4'] = lambda_4
data_pq['lambda_5'] = lambda_5

print('Done with feature computation')


# reindex and compute mean and std
data_pq = data_pq.reset_index().drop('index',axis=1)

# compute the mean and std
data_mean, data_std = dask.compute(data_pq.mean(),data_pq.std())

不确定它来自哪里。它说索引不匹配。这是我收到的错误消息:

--------------------------------------------------------------------------- ValueError                                Traceback (most recent call last) ~/Python/Data_driven_models/DASK_processing/Dask_parquet.py in <module>
    119 data_pq = data_pq.reset_index().drop('index',axis=1)
    120 
--> 121 data_mean, data_std = dask.compute(data_pq.mean(),data_pq.std())
    122 

~/.local/lib/python3.6/site-packages/dask/base.py in compute(*args,
**kwargs)
    450         postcomputes.append(x.__dask_postcompute__())
    451 
--> 452     results = schedule(dsk, keys, **kwargs)
    453     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    454 

~/.local/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     82         get_id=_thread_get_id,
     83         pack_exception=pack_exception,
---> 84         **kwargs
     85     )
     86 

~/.local/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    484                         _execute_task(task, data)  # Re-execute locally
    485                     else:
--> 486                         raise_exception(exc, tb)
    487                 res, worker_id = loads(res_info)
    488                 state["cache"][key] = res

~/.local/lib/python3.6/site-packages/dask/local.py in reraise(exc, tb)
    314     if exc.__traceback__ is not tb:
    315         raise exc.with_traceback(tb)
--> 316     raise exc
    317 
    318 

~/.local/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

~/.local/lib/python3.6/site-packages/dask/core.py in
_execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/.local/lib/python3.6/site-packages/pandas/core/series.py in
__init__(self, data, index, dtype, name, copy, fastpath)
    312                     if len(index) != len(data):
    313                         raise ValueError(
--> 314                             f"Length of passed values is {len(data)}, "
    315                             f"index implies {len(index)}."
    316                         )

ValueError: Length of passed values is 3728270, index implies 2135992.
4

0 回答 0