我需要将 zip 存档的内容提取到 hdfs。
def zip_to_hdfs(zip_arch, target_path):
zf = zipfile.ZipFile(zip_arch)
zf_list = zf.filelist
hdfs_uri = 'http://localhost:50070'
user = 'hive'
client = InsecureClient(hdfs_uri, user)
for item in zf_list:
new_fn = item.filename.replace('.xml', '.jsonl')
hdfs_path = pathlib.Path.joinpath(target_path, new_fn)
with client.write(hdfs_path, encoding='utf-8') as writer:
with zf.open(item, 'r') as fd:
my_json = xmltodict.parse(fd, dict_constructor=dict, item_depth=1, item_callback=my_handler)
writer.write(my_json)
我收到一个错误,例如 target_path is not a directory:
hdfs.util.HdfsError: /myhdfs/mydata/arch_folder (is not a directory)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkIsDirectory(FSPermissionChecker.java:539)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkSimpleTraverse(FSPermissionChecker.java:530)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:505)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1612)
那么将文件提取到hdfs的正确方法是什么。