0

我需要将 zip 存档的内容提取到 hdfs。

def zip_to_hdfs(zip_arch, target_path):
zf = zipfile.ZipFile(zip_arch)
zf_list = zf.filelist
hdfs_uri = 'http://localhost:50070'
user = 'hive'
client = InsecureClient(hdfs_uri, user)
for item in zf_list:
    new_fn = item.filename.replace('.xml', '.jsonl')
    hdfs_path = pathlib.Path.joinpath(target_path, new_fn)
    with client.write(hdfs_path, encoding='utf-8') as writer:
        with zf.open(item, 'r') as fd:
            my_json = xmltodict.parse(fd, dict_constructor=dict, item_depth=1, item_callback=my_handler)
            writer.write(my_json)

我收到一个错误,例如 target_path is not a directory:

hdfs.util.HdfsError: /myhdfs/mydata/arch_folder (is not a directory)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkIsDirectory(FSPermissionChecker.java:539)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkSimpleTraverse(FSPermissionChecker.java:530)
        at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:505)
        at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1612)

那么将文件提取到hdfs的正确方法是什么。

4

0 回答 0