我有几个parquet
文件(数据框),我将它们作为一个 dask 数据框图和示例加载。之后,我根据数据框中的原始数据执行一些计算,并将新列附加到我的 dask 数据框中。
最后,我想计算所有列的mean()
andstd()
并得到一个ValueError
我不确定它来自哪里或我做错了什么。
import pandas as pd
import numpy as np
import tensorflow as tf
import os
from os.path import join
import dask
import dask.dataframe as dd
import dask.array as da
# read in the data
data_pq = dd.read_parquet(join(path_to_data,'filter_width_*_DNN_train.parquet'),chunksize='4GB')
print('Convert to single precission and sample')
data_pq = data_pq.astype(np.float32).sample(frac=0.1)
# ## compute the additional quantites (tensors)
# compute tensors R, S mag(U) etc.
mag_U = da.sqrt(data_pq['U_bar'].values**2 + data_pq['V_bar'].values**2 +data_pq['W_bar'].values**2)
mag_grad_c = da.sqrt(data_pq['grad_c_x_LES'].values**2 + data_pq['grad_c_y_LES'].values**2 +data_pq['grad_c_z_LES'].values**2)
sum_U = data_pq['U_bar'].values + data_pq['V_bar']+data_pq['W_bar'].values
sum_c = da.absolute(data_pq['grad_c_x_LES'].values) + da.absolute(data_pq['grad_c_y_LES'].values) +da.absolute(data_pq['grad_c_z_LES'].values)
grad_U = da.sqrt(data_pq['grad_U_x_LES'].values**2 + data_pq['grad_U_y_LES'].values**2 +data_pq['grad_U_z_LES'].values**2)
grad_V = da.sqrt(data_pq['grad_V_x_LES'].values**2 + data_pq['grad_V_y_LES'].values**2 +data_pq['grad_V_z_LES'].values**2)
grad_W = da.sqrt(data_pq['grad_W_x_LES'].values**2 + data_pq['grad_W_y_LES'].values**2 +data_pq['grad_W_z_LES'].values**2)
mag_grad_U = da.sqrt(grad_U**2 + grad_V**2 +grad_W**2)
sum_grad_U = da.absolute(grad_U) + da.absolute(grad_V) +da.absolute(grad_W)
print('Computing gradient_tensor')
gradient_tensor = da.array([
[data_pq['grad_U_x_LES'],data_pq['grad_V_x_LES'],data_pq['grad_W_x_LES']],
[data_pq['grad_U_y_LES'],data_pq['grad_V_y_LES'],data_pq['grad_W_y_LES']],
[data_pq['grad_U_z_LES'],data_pq['grad_V_z_LES'],data_pq['grad_W_z_LES']]
])
print('Computing S and R')
# symetric strain
Strain = 0.5*(gradient_tensor + da.transpose(gradient_tensor,(1,0,2)))
#anti symetric strain
Anti = 0.5*(gradient_tensor - da.transpose(gradient_tensor,(1,0,2)))
print('Computing lambdas')
lambda_1 = da.trace(Strain**2)
lambda_2 = da.trace(Anti**2)
lambda_3 = da.trace(Strain**3)
lambda_4 = da.trace(Anti**2 * Strain)
lambda_5 = da.trace(Anti**2 * Strain**2)
# Add to the dask dataframe
data_pq['mag_grad_c'] = mag_grad_c
data_pq['mag_U'] = mag_U
data_pq['sum_c'] = sum_c
data_pq['sum_U'] = sum_U
data_pq['sum_grad_U'] = sum_grad_U
data_pq['mag_grad_U'] = mag_grad_U
# REPARTITON
data_pq = data_pq.repartition(npartitions=lambda_1.npartitions)
data_pq['lambda_1'] = lambda_1
data_pq['lambda_2'] = lambda_2
data_pq['lambda_3'] = lambda_3
data_pq['lambda_4'] = lambda_4
data_pq['lambda_5'] = lambda_5
print('Done with feature computation')
# reindex and compute mean and std
data_pq = data_pq.reset_index().drop('index',axis=1)
# compute the mean and std
data_mean, data_std = dask.compute(data_pq.mean(),data_pq.std())
不确定它来自哪里。它说索引不匹配。这是我收到的错误消息:
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) ~/Python/Data_driven_models/DASK_processing/Dask_parquet.py in <module>
119 data_pq = data_pq.reset_index().drop('index',axis=1)
120
--> 121 data_mean, data_std = dask.compute(data_pq.mean(),data_pq.std())
122
~/.local/lib/python3.6/site-packages/dask/base.py in compute(*args,
**kwargs)
450 postcomputes.append(x.__dask_postcompute__())
451
--> 452 results = schedule(dsk, keys, **kwargs)
453 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
454
~/.local/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
82 get_id=_thread_get_id,
83 pack_exception=pack_exception,
---> 84 **kwargs
85 )
86
~/.local/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
484 _execute_task(task, data) # Re-execute locally
485 else:
--> 486 raise_exception(exc, tb)
487 res, worker_id = loads(res_info)
488 state["cache"][key] = res
~/.local/lib/python3.6/site-packages/dask/local.py in reraise(exc, tb)
314 if exc.__traceback__ is not tb:
315 raise exc.with_traceback(tb)
--> 316 raise exc
317
318
~/.local/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
220 try:
221 task, data = loads(task_info)
--> 222 result = _execute_task(task, data)
223 id = get_id()
224 result = dumps((result, id))
~/.local/lib/python3.6/site-packages/dask/core.py in
_execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
~/.local/lib/python3.6/site-packages/pandas/core/series.py in
__init__(self, data, index, dtype, name, copy, fastpath)
312 if len(index) != len(data):
313 raise ValueError(
--> 314 f"Length of passed values is {len(data)}, "
315 f"index implies {len(index)}."
316 )
ValueError: Length of passed values is 3728270, index implies 2135992.