dask.dataframe.to_parquet
我使用(将数据从数据库移动到 Azure blob 存储)在 Azure blob 中创建了一个 parquet 文件。
我现在想阅读那个文件。我正在做:
STORAGE_OPTIONS={'account_name': 'ACCOUNT_NAME',
'account_key': 'ACCOUNT_KEY'}
df = dd.read_parquet('abfs://BLOB/FILE.parquet', storage_options=STORAGE_OPTIONS)
但我得到一个AzureHttpError
:
---------------------------------------------------------------------------
AzureHttpError Traceback (most recent call last)
<ipython-input-4-2184e772e417> in <module>
3 'account_key': 'ACCOUNT_KEY'}
4
----> 5 df = dd.read_parquet('abfs://BLOB/FILE', storage_options=STORAGE_OPTIONS)
~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\dataframe\io\parquet\core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, chunksize, **kwargs)
231 filters=filters,
232 split_row_groups=split_row_groups,
--> 233 **kwargs
234 )
235 if meta.index.name is not None:
~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\dataframe\io\parquet\fastparquet.py in read_metadata(fs, paths, categories, index, gather_statistics, filters, **kwargs)
176 # correspond to a row group (populated below).
177 parts, pf, gather_statistics, fast_metadata = _determine_pf_parts(
--> 178 fs, paths, gather_statistics, **kwargs
179 )
180
~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\dataframe\io\parquet\fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
127 open_with=fs.open,
128 sep=fs.sep,
--> 129 **kwargs.get("file", {})
130 )
131 if gather_statistics is None:
~\AppData\Local\Continuum\anaconda3\lib\site-packages\fastparquet\api.py in __init__(self, fn, verify, open_with, root, sep)
109 fn2 = join_path(fn, '_metadata')
110 self.fn = fn2
--> 111 with open_with(fn2, 'rb') as f:
112 self._parse_header(f, verify)
113 fn = fn2
~\AppData\Local\Continuum\anaconda3\lib\site-packages\fsspec\spec.py in open(self, path, mode, block_size, cache_options, **kwargs)
722 autocommit=ac,
723 cache_options=cache_options,
--> 724 **kwargs
725 )
726 if not ac:
~\AppData\Local\Continuum\anaconda3\lib\site-packages\adlfs\core.py in _open(self, path, mode, block_size, autocommit, cache_options, **kwargs)
552 autocommit=autocommit,
553 cache_options=cache_options,
--> 554 **kwargs,
555 )
556
~\AppData\Local\Continuum\anaconda3\lib\site-packages\adlfs\core.py in __init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, **kwargs)
582 cache_type=cache_type,
583 cache_options=cache_options,
--> 584 **kwargs,
585 )
586
~\AppData\Local\Continuum\anaconda3\lib\site-packages\fsspec\spec.py in __init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, **kwargs)
954 if mode == "rb":
955 if not hasattr(self, "details"):
--> 956 self.details = fs.info(path)
957 self.size = self.details["size"]
958 self.cache = caches[cache_type](
~\AppData\Local\Continuum\anaconda3\lib\site-packages\fsspec\spec.py in info(self, path, **kwargs)
499 if out:
500 return out[0]
--> 501 out = self.ls(path, detail=True, **kwargs)
502 path = path.rstrip("/")
503 out1 = [o for o in out if o["name"].rstrip("/") == path]
~\AppData\Local\Continuum\anaconda3\lib\site-packages\adlfs\core.py in ls(self, path, detail, invalidate_cache, delimiter, **kwargs)
446 # then return the contents
447 elif self._matches(
--> 448 container_name, path, as_directory=True, delimiter=delimiter
449 ):
450 logging.debug(f"{path} appears to be a directory")
~\AppData\Local\Continuum\anaconda3\lib\site-packages\adlfs\core.py in _matches(self, container_name, path, as_directory, delimiter)
386 prefix=path,
387 delimiter=delimiter,
--> 388 num_results=None,
389 )
390
~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\blob\baseblobservice.py in list_blob_names(self, container_name, prefix, num_results, include, delimiter, marker, timeout)
1360 '_context': operation_context,
1361 '_converter': _convert_xml_to_blob_name_list}
-> 1362 resp = self._list_blobs(*args, **kwargs)
1363
1364 return ListGenerator(resp, self._list_blobs, args, kwargs)
~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\blob\baseblobservice.py in _list_blobs(self, container_name, prefix, marker, max_results, include, delimiter, timeout, _context, _converter)
1435 }
1436
-> 1437 return self._perform_request(request, _converter, operation_context=_context)
1438
1439 def get_blob_account_information(self, container_name=None, blob_name=None, timeout=None):
~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\common\storageclient.py in _perform_request(self, request, parser, parser_args, operation_context, expected_errors)
444 status_code,
445 exception_str_in_one_line)
--> 446 raise ex
447 finally:
448 # If this is a location locked operation and the location is not set,
~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\common\storageclient.py in _perform_request(self, request, parser, parser_args, operation_context, expected_errors)
372 except AzureException as ex:
373 retry_context.exception = ex
--> 374 raise ex
375 except Exception as ex:
376 retry_context.exception = ex
~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\common\storageclient.py in _perform_request(self, request, parser, parser_args, operation_context, expected_errors)
358 # and raised as an azure http exception
359 _http_error_handler(
--> 360 HTTPError(response.status, response.message, response.headers, response.body))
361
362 # Parse the response
~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\common\_error.py in _http_error_handler(http_error)
113 ex.error_code = error_code
114
--> 115 raise ex
116
117
AzureHttpError: Server encountered an internal error. Please try again after some time. ErrorCode: InternalError
<?xml version="1.0" encoding="utf-8"?><Error><Code>InternalError</Code><Message>Server encountered an internal error. Please try again after some time.
RequestId:...
Time:2020-04-15T02:44:06.8611398Z</Message></Error>