我尝试从存储在 hdfs 中的 csv 创建一个数据框。连接成功。但是当试图获取 len 函数的输出时出现错误。
代码:
from dask_yarn import YarnCluster
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import subprocess
import os
# GET HDFS CLASSPATH
classpath = subprocess.Popen(["/usr/hdp/current/hadoop-client/bin/hadoop", "classpath", "--glob"], stdout=subprocess.PIPE).communicate()[0]
os.environ["HADOOP_HOME"] = "/usr/hdp/current/hadoop-client"
os.environ["ARROW_LIBHDFS_DIR"] = "/usr/hdp/3.1.4.0-315/usr/lib/"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java/"
os.environ["CLASSPATH"] = classpath.decode("utf-8")
# GET HDFS CLASSPATH
classpath = subprocess.Popen(["/usr/hdp/current/hadoop-client/bin/hadoop", "classpath", "--glob"], stdout=subprocess.PIPE).communicate()[0]
cluster = YarnCluster(environment='python:///opt/anaconda3/bin/python3', worker_vcores=32, worker_memory="128GiB", n_workers=10)
client = Client(cluster)
client
df = dd.read_csv('hdfs://masterha/data/batch/82.csv')
len(df)
错误:
>>> len(ddf)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/anaconda3/lib/python3.7/site-packages/dask/dataframe/core.py", line 504, in __len__
len, np.sum, token="len", meta=int, split_every=False
File "/opt/anaconda3/lib/python3.7/site-packages/dask/base.py", line 165, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/opt/anaconda3/lib/python3.7/site-packages/dask/base.py", line 436, in compute
results = schedule(dsk, keys, **kwargs)
File "/opt/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 2539, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/opt/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 1839, in gather
asynchronous=asynchronous,
File "/opt/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 756, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "/opt/anaconda3/lib/python3.7/site-packages/distributed/utils.py", line 333, in sync
raise exc.with_traceback(tb)
File "/opt/anaconda3/lib/python3.7/site-packages/distributed/utils.py", line 317, in f
result[0] = yield future
File "/opt/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/opt/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 1695, in _gather
raise exception.with_traceback(traceback)
File "/opt/anaconda3/lib/python3.7/site-packages/dask/bytes/core.py", line 181, in read_block_from_file
with copy.copy(lazy_file) as f:
File "/opt/anaconda3/lib/python3.7/site-packages/fsspec/core.py", line 88, in __enter__
f = self.fs.open(self.path, mode=mode)
File "/opt/anaconda3/lib/python3.7/site-packages/fsspec/implementations/hdfs.py", line 116, in <lambda>
return lambda *args, **kw: getattr(PyArrowHDFS, item)(self, *args, **kw)
File "/opt/anaconda3/lib/python3.7/site-packages/fsspec/spec.py", line 708, in open
path, mode=mode, block_size=block_size, autocommit=ac, **kwargs
File "/opt/anaconda3/lib/python3.7/site-packages/fsspec/implementations/hdfs.py", line 116, in <lambda>
return lambda *args, **kw: getattr(PyArrowHDFS, item)(self, *args, **kw)
File "/opt/anaconda3/lib/python3.7/site-packages/fsspec/implementations/hdfs.py", line 72, in _open
return HDFSFile(self, path, mode, block_size, **kwargs)
File "/opt/anaconda3/lib/python3.7/site-packages/fsspec/implementations/hdfs.py", line 171, in __init__
self.fh = fs.pahdfs.open(path, mode, block_size, **kwargs)
File "pyarrow/io-hdfs.pxi", line 431, in pyarrow.lib.HadoopFileSystem.open
File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: HDFS file does not exist: /data/batch/82.csv