0

我编写了一个代码来尝试 Dask 在我的 Unix 服务器上使用多个处理器,如下所示:

import pandas as pd
import sys
import dask.dataframe as dd
from dask.multiprocessing import get


numbers = pd.read_csv("head_5_22SNPs_CMI.txt", sep="\t", header=None)

combinations = pd.read_csv("all_combinations_5snps.txt", sep=" ", header=None)

data_dask = dd.from_pandas(combinations, npartitions=5)

pop = int(1 + 5)

score_col, freq_col = [], []

def score_freq(line):
    score=0
    freq=1
    for j in range(len(line)):
        if line[j][1] != numbers.values[j][1]:   # homozygous for ref
            score+=0
            freq*=(float(1-float(numbers.values[j][pop]))*float(1-float(numbers.values[j][pop])))
        elif line[j][0] != numbers.values[j][1] and line[j][1] == numbers.values[j][1]: # heterozygous
            score+=(float(numbers.values[j][5]))
            freq*=(2*(float(1-float(numbers.values[j][pop]))*float(numbers.values[j][pop])))
        elif line[j][0] == numbers.values[j][1]:
            score+=2*(float(numbers.values[j][5]))
            freq*=(float(numbers.values[j][pop])*float(numbers.values[j][pop]))

        if freq < 1e-5:   # threshold to stop loop in interest of efficiency 
            break


    return pd.Series([score, freq])

res = data_dask.map_partitions(lambda df: df.apply((lambda row: score_freq(row)), axis=1)).compute(scheduler=get)

res.to_csv('dask_test.txt', index=False)

在我的 Unix 服务器上运行此代码时出现错误:

Traceback (most recent call last):
  File "compute_scores_pandas+dask_testing.py", line 3, in <module>
    import dask.dataframe as dd
  File "/hpc/home/lsiwzyj/anaconda/lib/python2.7/site-packages/dask/dataframe/__init__.py", line 4, in <module>
    from .core import (DataFrame, Series, Index, _Frame, map_partitions,
  File "/hpc/home/lsiwzyj/anaconda/lib/python2.7/site-packages/dask/dataframe/core.py", line 19, in <module>
    from .. import array as da
  File "/hpc/home/lsiwzyj/anaconda/lib/python2.7/site-packages/dask/array/__init__.py", line 5, in <module>
    from .core import (Array, block, concatenate, stack, from_array, store,
  File "/hpc/home/lsiwzyj/anaconda/lib/python2.7/site-packages/dask/array/core.py", line 31, in <module>
    from . import chunk
  File "/hpc/home/lsiwzyj/anaconda/lib/python2.7/site-packages/dask/array/chunk.py", line 19, in <module>
    broadcast_to = npcompat.broadcast_to
AttributeError: 'module' object has no attribute 'broadcast_to'

经过一番谷歌搜索,似乎这个问题可能是类名之间的冲突,但我似乎无法在我的脚本中找出任何问题。我也尝试过升级 Dask 包,我确实收到了这样的警告:

Cannot uninstall 'python-dateutil'. It is a distutils installed project anthus we cannot accurately determine which files belong to it which would ld to only a partial uninstall.

有谁知道问题是什么?该脚本在我的 IDE 中的 Windows 上运行。

4

2 回答 2

1

The problem is name collision in two libraries: This tries to import broadcast_to from name npcompat which is confused by two different definitions here and possibly here, the latter is a guess though.

There was a bug in dask which is discussed here, as referenced by @mdurant in comments above.

It looks like you don't have numpy installed, and installing that would be my solution unless there is a strong reason not to do so.

On reading your comment, I'm not very sure about the cause (so not being so useful here) and have to dig in further, but I'd start with getting rid of that warning. You can do so by removing python-dateutil using conda remove command. Upgrading dask again, then, will install it again (with latest version) and should not show that warning.

于 2018-11-02T09:38:11.773 回答
0

因此,在为一个模块创建一个 requirements.txt 文件时,我也发现了这个问题,该模块充满了装饰有 dask 延迟的函数。由于 dask 后端代码似乎有问题,我找到了一种解决方法:

import dask.delayed as delayed

@delayed
def some_fun(x):
     return x

代替

import dask
@dask.delayed
def some_fun(x):
     return x

古怪,但它可以工作并且对我放置它的各种环境更加健壮。

于 2019-12-04T00:12:19.840 回答