我有几个parquet文件(数据框),我将它们作为一个 dask 数据框图和示例加载。之后,我根据数据框中的原始数据执行一些计算,并将新列附加到我的 dask 数据框中。
最后,我想计算所有列的mean()andstd()并得到一个ValueError我不确定它来自哪里或我做错了什么。
import pandas as pd
import numpy as np
import tensorflow as tf
import os
from os.path import join
import dask
import dask.dataframe as dd
import dask.array as da
# read in the data
data_pq = dd.read_parquet(join(path_to_data,'filter_width_*_DNN_train.parquet'),chunksize='4GB')
print('Convert to single precission and sample')
data_pq = data_pq.astype(np.float32).sample(frac=0.1)
# ## compute the additional quantites (tensors)
# compute tensors R, S mag(U) etc.
mag_U = da.sqrt(data_pq['U_bar'].values**2 + data_pq['V_bar'].values**2 +data_pq['W_bar'].values**2)
mag_grad_c = da.sqrt(data_pq['grad_c_x_LES'].values**2 + data_pq['grad_c_y_LES'].values**2 +data_pq['grad_c_z_LES'].values**2)
sum_U = data_pq['U_bar'].values + data_pq['V_bar']+data_pq['W_bar'].values
sum_c = da.absolute(data_pq['grad_c_x_LES'].values) + da.absolute(data_pq['grad_c_y_LES'].values) +da.absolute(data_pq['grad_c_z_LES'].values)
grad_U = da.sqrt(data_pq['grad_U_x_LES'].values**2 + data_pq['grad_U_y_LES'].values**2 +data_pq['grad_U_z_LES'].values**2)
grad_V = da.sqrt(data_pq['grad_V_x_LES'].values**2 + data_pq['grad_V_y_LES'].values**2 +data_pq['grad_V_z_LES'].values**2)
grad_W = da.sqrt(data_pq['grad_W_x_LES'].values**2 + data_pq['grad_W_y_LES'].values**2 +data_pq['grad_W_z_LES'].values**2)
mag_grad_U = da.sqrt(grad_U**2 + grad_V**2 +grad_W**2)
sum_grad_U = da.absolute(grad_U) + da.absolute(grad_V) +da.absolute(grad_W)
print('Computing gradient_tensor')
gradient_tensor = da.array([
[data_pq['grad_U_x_LES'],data_pq['grad_V_x_LES'],data_pq['grad_W_x_LES']],
[data_pq['grad_U_y_LES'],data_pq['grad_V_y_LES'],data_pq['grad_W_y_LES']],
[data_pq['grad_U_z_LES'],data_pq['grad_V_z_LES'],data_pq['grad_W_z_LES']]
])
print('Computing S and R')
# symetric strain
Strain = 0.5*(gradient_tensor + da.transpose(gradient_tensor,(1,0,2)))
#anti symetric strain
Anti = 0.5*(gradient_tensor - da.transpose(gradient_tensor,(1,0,2)))
print('Computing lambdas')
lambda_1 = da.trace(Strain**2)
lambda_2 = da.trace(Anti**2)
lambda_3 = da.trace(Strain**3)
lambda_4 = da.trace(Anti**2 * Strain)
lambda_5 = da.trace(Anti**2 * Strain**2)
# Add to the dask dataframe
data_pq['mag_grad_c'] = mag_grad_c
data_pq['mag_U'] = mag_U
data_pq['sum_c'] = sum_c
data_pq['sum_U'] = sum_U
data_pq['sum_grad_U'] = sum_grad_U
data_pq['mag_grad_U'] = mag_grad_U
# REPARTITON
data_pq = data_pq.repartition(npartitions=lambda_1.npartitions)
data_pq['lambda_1'] = lambda_1
data_pq['lambda_2'] = lambda_2
data_pq['lambda_3'] = lambda_3
data_pq['lambda_4'] = lambda_4
data_pq['lambda_5'] = lambda_5
print('Done with feature computation')
# reindex and compute mean and std
data_pq = data_pq.reset_index().drop('index',axis=1)
# compute the mean and std
data_mean, data_std = dask.compute(data_pq.mean(),data_pq.std())
不确定它来自哪里。它说索引不匹配。这是我收到的错误消息:
--------------------------------------------------------------------------- ValueError Traceback (most recent call last) ~/Python/Data_driven_models/DASK_processing/Dask_parquet.py in <module>
119 data_pq = data_pq.reset_index().drop('index',axis=1)
120
--> 121 data_mean, data_std = dask.compute(data_pq.mean(),data_pq.std())
122
~/.local/lib/python3.6/site-packages/dask/base.py in compute(*args,
**kwargs)
450 postcomputes.append(x.__dask_postcompute__())
451
--> 452 results = schedule(dsk, keys, **kwargs)
453 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
454
~/.local/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
82 get_id=_thread_get_id,
83 pack_exception=pack_exception,
---> 84 **kwargs
85 )
86
~/.local/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
484 _execute_task(task, data) # Re-execute locally
485 else:
--> 486 raise_exception(exc, tb)
487 res, worker_id = loads(res_info)
488 state["cache"][key] = res
~/.local/lib/python3.6/site-packages/dask/local.py in reraise(exc, tb)
314 if exc.__traceback__ is not tb:
315 raise exc.with_traceback(tb)
--> 316 raise exc
317
318
~/.local/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
220 try:
221 task, data = loads(task_info)
--> 222 result = _execute_task(task, data)
223 id = get_id()
224 result = dumps((result, id))
~/.local/lib/python3.6/site-packages/dask/core.py in
_execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
~/.local/lib/python3.6/site-packages/pandas/core/series.py in
__init__(self, data, index, dtype, name, copy, fastpath)
312 if len(index) != len(data):
313 raise ValueError(
--> 314 f"Length of passed values is {len(data)}, "
315 f"index implies {len(index)}."
316 )
ValueError: Length of passed values is 3728270, index implies 2135992.