据我所知(包括我自己),这已成为 xarray 用户中一个非常普遍的问题,并且与这个 Github issue密切相关。通常,存在某个函数的 NumPy 实现(在您的情况下,np.polyfit()
),但不清楚如何最好地将此计算应用于每个网格单元,可能在多个维度上。
在地球科学领域,有两个主要用例,一个是简单的解决方案,另一个是更复杂的:
(1) 简易案例:
您有一个 xr.DataArray temp
,它是 的函数,(time, lat, lon)
并且您希望在每个网格框中及时找到趋势。最简单的方法是将(lat, lon)
坐标分组到一个新坐标中,按该坐标分组,然后使用该.apply()
方法。
受Ryan Abernathy的Gist启发:<3
# Example data
da = xr.DataArray(np.random.randn(20, 180, 360),
dims=('time', 'lat', 'lon'),
coords={'time': np.linspace(0,19, 20),
'lat': np.linspace(-90,90,180),
'lon': np.linspace(0,359, 360)})
# define a function to compute a linear trend of a timeseries
def linear_trend(x):
pf = np.polyfit(x.time, x, 1)
# need to return an xr.DataArray for groupby
return xr.DataArray(pf[0])
# stack lat and lon into a single dimension called allpoints
stacked = da.stack(allpoints=['lat','lon'])
# apply the function over allpoints to calculate the trend at each point
trend = stacked.groupby('allpoints').apply(linear_trend)
# unstack back to lat lon coordinates
trend_unstacked = trend.unstack('allpoints')
缺点:对于较大的数组,这种方法变得非常慢,并且不容易解决本质上可能感觉非常相似的其他问题。这导致我们...
(2)更难的情况(和OP的问题):
您有一个 xr.Dataset ,其中包含变量temp
和height
的每个函数,(plev, time, lat, lon)
您希望找到每个点temp
的回归height
(失效率) 。(time, lat, lon)
解决此问题的最简单方法是使用 xr.apply_ufunc(),它为您提供一定程度的矢量化和 dask 兼容性。(速度!)
# Example DataArrays
da1 = xr.DataArray(np.random.randn(20, 20, 180, 360),
dims=('plev', 'time', 'lat', 'lon'),
coords={'plev': np.linspace(0,19, 20),
'time': np.linspace(0,19, 20),
'lat': np.linspace(-90,90,180),
'lon': np.linspace(0,359, 360)})
# Create dataset
ds = xr.Dataset({'Temp': da1, 'Height': da1})
和以前一样,我们创建一个函数来计算我们需要的线性趋势:
def linear_trend(x, y):
pf = np.polyfit(x, y, 1)
return xr.DataArray(pf[0])
现在,我们可以使用沿维度对两个 DataArray进行xr.apply_ufunc()
回归!temp
height
plev
%%time
slopes = xr.apply_ufunc(linear_trend,
ds.Height, ds.Temp,
vectorize=True,
input_core_dims=[['plev'], ['plev']],# reduce along 'plev'
)
但是,这种方法也很慢,并且和以前一样,不能很好地扩展到更大的阵列。
CPU times: user 2min 44s, sys: 2.1 s, total: 2min 46s
Wall time: 2min 48s
加快速度:
为了加快计算速度,我们可以将height
和temp
数据转换为dask.arrays
using xr.DataArray.chunk()
。这将我们的数据分成小的、可管理的块,然后我们可以使用这些块来并行化我们的dask=parallelized
计算apply_ufunc()
。
注意,您必须小心不要沿着要应用回归的维度进行分块!
dask_height = ds.Height.chunk({'lat':10, 'lon':10, 'time': 10})
dask_temp = ds.Temp.chunk({'lat':10, 'lon':10, 'time': 10})
dask_height
<xarray.DataArray 'Height' (plev: 20, time: 20, lat: 180, lon: 360)>
dask.array<xarray-<this-array>, shape=(20, 20, 180, 360), dtype=float64, chunksize=(20, 10, 10, 10), chunktype=numpy.ndarray>
Coordinates:
* plev (plev) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
* time (time) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
* lat (lat) float64 -90.0 -88.99 -87.99 -86.98 ... 86.98 87.99 88.99 90.0
* lon (lon) int64 0 1 2 3 4 5 6 7 8 ... 352 353 354 355 356 357 358 359
现在,再次进行计算!
%%time
slopes_dask = xr.apply_ufunc(linear_trend,
dask_height, dask_temp,
vectorize=True,
dask='parallelized',
input_core_dims=[['plev'], ['plev']], # reduce along 'plev'
output_dtypes=['d'],
)
CPU times: user 6.55 ms, sys: 2.39 ms, total: 8.94 ms
Wall time: 9.24 ms
显着加速!
希望这可以帮助!我学到了很多试图回答它:)
最好的
编辑:正如评论中指出的那样,要真正比较 dask 和非 dask 方法之间的处理时间,您应该使用:
%%time
slopes_dask.compute()
这为您提供了与非 dask 方法相当的挂壁时间。
然而,重要的是要指出,在处理气候分析中遇到的那种大型数据集时,更倾向于对数据进行惰性操作(即在绝对需要之前不加载它)。所以我仍然建议使用 dask 方法,因为那样你可以在输入数组上操作许多不同的进程,每个进程只需要几个ms
,然后只有在最后你需要等待几分钟才能得到你的成品出去。:)