5

我目前使用 Vaex 为直方图生成分箱数据并抽取大时间序列数据。本质上,我将数百万个时间序列点减少到多个 bin 中,并计算每个 bin 的平均值、最大值和最小值。我想比较Vaex(读取 HDF 文件)和 Dask(读取 Parquet 文件)并将其保持为“核外内存”。

更新 3(我已删除以前的更新):

第一次运行 Dask 比 Vaex 快 30%,但重复运行后 Vaex 快 4.5 倍。我相信 Vaex 通过内存映射获得了这种加速。Dask 有没有办法提高重复运行的执行时间?

首先,创建一些随机数据并生成一些文件,警告:这将生成 1.5GB 的数据。

import numpy as np
import vaex as vx
import pandas as pd
import dask.dataframe as dd
import os

#cwd = os.getcwd() # Change this to your directory for path to save hdf and parquet files 
cwd = r'F:\temp\DaskVaexx' # Write files to this directory.  Use a fast SSD for fast read calculations in Dask/Vaex

### Create random data
size = 20000000 # number of rows
scale = 1.
scaleLocal = 20
np.random.seed(0)
x_data = np.arange(size)
y_data = np.cumsum(np.random.randn(size)  * scale) + np.random.randn(size) * scaleLocal

np.random.seed(1)
scaleLocal2 = 3
y_data2 = np.cumsum(np.random.randn(size)  * scale) + np.random.randn(size) * scaleLocal2
df = pd.DataFrame({'t': x_data.astype(np.float32),'Channel1' : y_data.astype(np.float32),'Channel2' : y_data2.astype(np.float32)})
# df

#Create Dask dataframe
dask_df = dd.from_pandas(df, npartitions=1)

# Creat a Vaex dataset from pandas and then export to hdf5
dataVX = vx.from_pandas(df)
dataVX.export_hdf5(os.path.join(cwd, 'randomData.hdf'))

# Create a parquet folder and files from dask dataframe
dask_df.to_parquet(os.path.join(cwd, 'randomData.parquet'))

# Create a hdf file from dask dataframe
#dask_df.to_hdf(os.path.join(cwd, 'randomDataDask.hdf'), '/data')

现在进行 Vaex 和 Dask 处理:

import dask.dataframe as dd
import dask.array as da
import vaex as vx
import dask
import time
import os
import numpy as np
import pandas as pd

#

bins = 1000
minLimit = 0
maxLimit = 1000000
timeArrayName = 't'
column = 'Channel1'

# filePath = os.getcwd() # location of hdf and parquet data
filePath = r'F:\temp\DaskVaexx' # location of hdf and parquet data

# ------------------------------
# Vaex code

startTime = time.time()

dataVX = vx.open(os.path.join(filePath,r'randomData.hdf'))

#Calculate the min & max of a columnar dataset for each bin
minMaxVaexOutputArray = dataVX.minmax(column, binby=[timeArrayName],  shape=(bins,), limits=[minLimit,maxLimit])

VaexResults_df = pd.DataFrame(data = minMaxVaexOutputArray, columns = ['min','max'])

#Calculate the mean of a columnar dataset for each bin
VaexResults_df['mean'] = dataVX.mean(column, binby=[timeArrayName],  shape=(bins,), limits=[minLimit, maxLimit]) 

print('Vaex hdf computation time: ' + str(time.time() - startTime))

# dataVX.close_files() # option to close down the opened Vaex dataset
# ------------------------------

# ------------------------------
# Dask computation
startTime = time.time()

# Read parquet file or folder of files
parquet_dd = dd.read_parquet(os.path.join(filePath,r'randomData.parquet'))  

# Create a virtual column which assigns integers to the time signal according to its assigned bin
parquet_dd['timeGroups'] = parquet_dd[timeArrayName].where((parquet_dd[timeArrayName]>=minLimit) & (parquet_dd[timeArrayName]<maxLimit))   // ((maxLimit - minLimit ) / bins)

# Groupby using the virtual column
df3 = parquet_dd[column].groupby(parquet_dd['timeGroups']).aggregate(['min', 'max', 'mean'])

#Execute Dask and return results to a Pandas Dataframe
DaskResults_df = dask.compute(df3)[0]

print('Dask with parquet computation time: ' + str(time.time() - startTime))
4

0 回答 0