1

我有一个函数sig2z,我想在一个 dask 数组上应用它:

def sig2z(da, zr, zi, nvar=None, dim=None, coord=None):
    """
    Interpolate variables on \sigma coordinates onto z coordinates.

    Parameters
    ----------
    da : `dask.array`
        The data on sigma coordinates to be interpolated
    zr : `dask.array`
        The depths corresponding to sigma layers
    zi : `numpy.array`
        The depths which to interpolate the data on
    nvar : str (optional)
        Name of the variable. Only necessary when the variable is
        horizontal velocity.

    Returns
    -------
    dai : `dask.array`
        The data interpolated onto a spatial uniform z coordinate
    """

    if np.diff(zi)[0] < 0. or zi.max() <= 0.:
        raise ValueError("The values in `zi` should be postive and increasing.")
    if np.any(np.absolute(zr[0]) < np.absolute(zr[-1])):
        raise ValueError("`zr` should have the deepest depth at index 0.")
    if zr.shape != da.shape[-3:]:
        raise ValueError("`zr` should have the same "
                        "spatial dimensions as `da`.")

    if dim == None:
        dim = da.dims
    if coord == None:
        coord = da.coords
    N = da.shape
    nzi = len(zi)
    if len(N) == 4:
        dai = np.empty((N[0],nzi,N[-2],N[-1]))
    elif len(N) == 3:
        dai = np.empty((nzi,N[-2],N[-1]))
    else:
        raise ValueError("The data should at least have three dimensions")
    dai[:] = np.nan

    zi = -zi[::-1] # ROMS has deepest level at index=0

    if nvar=='u':  # u variables
        zl = .5*(zr.shift(eta_rho=-1, xi_rho=-1)
                 + zr.shift(eta_rho=-1)
                )
    elif nvar=='v': # v variables
        zl = .5*(zr.shift(xi_rho=-1)
                 + zr.shift(eta_rho=-1, xi_rho=-1)
                )
    else:
        zl = zr

    for i in range(N[-1]):
        for j in range(N[-2]):
            # only bother for sufficiently deep regions
            if zl[:,j,i].min() < -1e2:
                # only interp on z above topo
                ind = np.argwhere(zi >= zl[:,j,i].copy().min())
                if len(N) == 4:
                    for s in range(N[0]):
                        dai[s,:len(ind),j,i] = _interpolate(da[s,:,j,i].copy(),
                                                            zl[:,j,i].copy(),
                                                            zi[int(ind[0]):]
                                                           )
                else:
                    dai[:len(ind),j,i] = _interpolate(da[:,j,i].copy(),
                                                      zl[:,j,i].copy(),
                                                      zi[int(ind[0]):]
                                                     )

    return xr.DataArray(dai, dims=dim, coords=coord)

这在 xarray.DataArray 上运行良好,但是当我将它应用于 dask.array 时,出现以下错误:

test = dsar.map_blocks(sig2z, w[0].data, 
                      zr.chunk({'eta_rho':1,'xi_rho':1}).data, zi, 
                      dim, coord,
                      chunks=dai[0].chunks, dtype=dai.dtype
                      ).compute()

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-29-d81bad2f4486> in <module>()
----> 1 test.compute()

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
     95             Extra keywords to forward to the scheduler ``get`` function.
     96         """
---> 97         (result,) = compute(self, traverse=False, **kwargs)
     98         return result
     99 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    202     dsk = collections_to_dsk(variables, optimize_graph, **kwargs)
    203     keys = [var._keys() for var in variables]
--> 204     results = get(dsk, keys, **kwargs)
    205 
    206     results_iter = iter(results)

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
     73     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     74                         cache=cache, get_id=_thread_get_id,
---> 75                         pack_exception=pack_exception, **kwargs)
     76 
     77     # Cleanup pools associated to dead threads

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    519                         _execute_task(task, data)  # Re-execute locally
    520                     else:
--> 521                         raise_exception(exc, tb)
    522                 res, worker_id = loads(res_info)
    523                 state['cache'][key] = res

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
     58         if exc.__traceback__ is not tb:
     59             raise exc.with_traceback(tb)
---> 60         raise exc
     61 
     62 else:

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    288     try:
    289         task, data = loads(task_info)
--> 290         result = _execute_task(task, data)
    291         id = get_id()
    292         result = dumps((result, id))

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk)
    269         func, args = arg[0], arg[1:]
    270         args2 = [_execute_task(a, cache) for a in args]
--> 271         return func(*args2)
    272     elif not ishashable(arg):
    273         return arg

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/array/core.py in getarray(a, b, lock)
     63         c = a[b]
     64         if type(c) != np.ndarray:
---> 65             c = np.asarray(c)
     66     finally:
     67         if lock:

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order)
    529 
    530     """
--> 531     return array(a, dtype, copy=False, order=order)
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    425 
    426     def __array__(self, dtype=None):
--> 427         self._ensure_cached()
    428         return np.asarray(self.array, dtype=dtype)
    429 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in _ensure_cached(self)
    422     def _ensure_cached(self):
    423         if not isinstance(self.array, np.ndarray):
--> 424             self.array = np.asarray(self.array)
    425 
    426     def __array__(self, dtype=None):

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order)
    529 
    530     """
--> 531     return array(a, dtype, copy=False, order=order)
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    406 
    407     def __array__(self, dtype=None):
--> 408         return np.asarray(self.array, dtype=dtype)
    409 
    410     def __getitem__(self, key):

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order)
    529 
    530     """
--> 531     return array(a, dtype, copy=False, order=order)
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    373     def __array__(self, dtype=None):
    374         array = orthogonally_indexable(self.array)
--> 375         return np.asarray(array[self.key], dtype=None)
    376 
    377     def __getitem__(self, key):

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order)
    529 
    530     """
--> 531     return array(a, dtype, copy=False, order=order)
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    373     def __array__(self, dtype=None):
    374         array = orthogonally_indexable(self.array)
--> 375         return np.asarray(array[self.key], dtype=None)
    376 
    377     def __getitem__(self, key):

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/backends/netCDF4_.py in __getitem__(self, key)
     58         with self.datastore.ensure_open(autoclose=True):
     59             try:
---> 60                 data = getitem(self.get_array(), key)
     61             except IndexError:
     62                 # Catch IndexError in netCDF4 and return a more informative

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:39743)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._get (netCDF4/_netCDF4.c:49835)()

RuntimeError: Resource temporarily unavailable

有人可以告诉我为什么我会收到这个错误吗?先感谢您。

4

1 回答 1

0

pid 号、打开的文件描述符、内存是有限的资源。

fork(2) 手册说什么时候errno.EAGAIN应该发生:

[EAGAIN] 系统对进程总数的限制
          会超出执行。此限制取决于配置。

[EAGAIN] 系统对进程总数的限制 MAXUPRC()
          在单个用户执行下将被超过。

为了更容易地重现错误,您可以在程序的开头添加:

import resource

resource.setrlimit(resource.RLIMIT_NPROC, (20, 20))

问题可能是所有子进程都处于活动状态,因为您尚未调用p.stdin.close()并且 gnuplot 的标准输入在重定向到管道时可能会被完全缓冲,即gnuplot进程可能会卡住等待输入。和/或您的应用程序使用了太多的文件描述符(文件描述符在 Python 2.7 上默认由子进程继承)而没有释放它们。

如果输入不依赖于输出并且输入的大小有限,则使用.communicate()

from subprocess import Popen, PIPE, STDOUT

p = Popen("gnuplot", stdin=PIPE, stdout=PIPE, stderr=PIPE,
          close_fds=True, # to avoid running out of file descriptors
          bufsize=-1, # fully buffered (use zero (default) if no p.communicate())
          universal_newlines=True) # translate newlines, encode/decode text
out, err = p.communicate("\n".join(['set terminal gif;', contents]))

.communicate()写入所有输入并读取所有输出(同时,因此没有死锁)然后关闭 p.stdin、p.stdout、p.stderr(即使输入很小并且 gnuplot 的一侧已完全缓冲;EOF 刷新缓冲区)并等待完成的过程(没有僵尸)。

Popen_cleanup()在其构造函数中调用轮询所有已知子进程的退出状态,即,即使您不调用p.wait()也不应该有很多僵尸进程(死了但具有未读状态)。

来自https://stackoverflow.com/a/22729602/4879665的回答

于 2017-08-06T14:24:43.703 回答