2

我有一个顶级函数,它获取一个包含 parquet 文件路径和列名的元组。

该函数仅从文件中加载列,转换为 pandas,然后将其打包/序列化为标准形式。就像是:

import pyarrow as pa
import pyarrow.parquet as pq
from multiprocessing import Pool

def binarizer(file_data_tuple):
   ''' Read a Parquet column a file, binarize and return'''

   path, col_name, col_meta, native = file_data_tuple
   if not native: 
       # Either this or using a top level hdfs_con
       hdfs_con = pa.hdfs.connect(params)     
   read_pq = pq.read_table if native else hdfs_con.read_parquet

   arrow_col = read_pq(filepath, columns = (col_name,))
   bin_col = imported_binarizng_function(arrow_col)
   return bin_col

def read_binarize_parallel(filepaths):
    ''' Setup parallel reading and binarizing of a parquet file'''

    # list of tuples containing the filepath, column name, meta, and mode   
    pool_params = [(),..] 
    pool = Pool()
    for file in filepaths:
        bin_cols = pool.map(binarizer, pool_params)
        chunk =  b''.join(bin_cols)
        send_over_socket(chunk)

这在我使用本机模式时有效,也就是从本地文件系统读取文件。

但是,如果我尝试通过 hdfs 读取,我会得到奇怪的(对我来说)箭头错误,无论是当我在每个进程中打开一个连接还是尝试使用同一个连接时。这是错误的压缩版本:

[libprotobuf 错误 google/protobuf/message_lite.cc:123] 无法解析“Hdfs.Internal.RpcResponseHeaderProto”类型的消息,因为它缺少必填字段:callId、状态 [libprotobuf 错误 google/protobuf/message_lite.cc:123]无法解析“Hdfs.Internal.RpcResponseHeaderProto”类型的消息,因为它缺少必填字段:callId、状态 [libprotobuf ERROR google/protobuf/message_lite.cc:123] 无法解析“Hdfs.Internal.RpcResponseHeaderProto”类型的消息” 因为它缺少必填字段:callId,状态 [libprotobuf 错误 google/protobuf/message_lite.cc:123] 无法解析类型为“Hdfs.Internal.RpcResponseHeaderProto”的消息,因为它缺少必填字段:callId,状态 2018- 01-09 21:41:47.939006, p10007, th139965275871040,错误无法在服务器“192.168.0.101:9000”上调用 RPC 调用“getFileInfo”:RpcChannel.cpp:703:HdfsRpcException:RPC 通道到“192.168.0.101:9000”得到协议不匹配:RPC 通道找不到挂起的调用:id = 3.@未知

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, 箭头::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_15isfile(_object*, _object*) @ 未知 @ 未知

@   Unknown

2018-01-09 21:41:47.939103,P10007,TH139965275871040,信息重试iDempotent rpc呼叫“ getfileinfo”服务器“ getfileinfo” on Server“ 192.168.0.0.0.101:9000在服务器“192.168.0.101:9000”上调用 RPC 调用“getFileInfo”:RpcChannel.cpp:780:HdfsRpcException:RPC 通道到“192.168.0.101:9000”得到协议不匹配:RPC 通道无法解析响应标头。@未知

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, 箭头::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 未知 @ 未知

@   Unknown
@2018-01-09 21:41:47.939406, p10008, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server

“192.168.0.101:9000”:RpcChannel.cpp:780:HdfsRpcException:RPC 通道到“192.168.0.101:9000”得到协议不匹配:RPC 通道无法解析响应标头。@未知

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, 箭头::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 未知

@   Unknown 2018-01-09 21:41:47.939422, p10013, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server

“192.168.0.101:9000”:RpcChannel.cpp:780:HdfsRpcException:RPC 通道到“192.168.0.101:9000”得到协议不匹配:RPC 通道无法解析响应标头。@未知

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, 箭头::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 未知

@   Unknown
@2018-01-09 21:41:47.939431, p10009, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server

“192.168.0.101:9000”:RpcChannel.cpp:780:HdfsRpcException:RPC 通道到“192.168.0.101:9000”得到协议不匹配:RPC 通道无法解析响应标头。@未知

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, 箭头::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 未知

@   Unknown
@   @   Unknown
Unknown 2018-01-09 21:41:47.939457, p10012, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server

“192.168.0.101:9000”:RpcChannel.cpp:780:HdfsRpcException:RPC 通道到“192.168.0.101:9000”得到协议不匹配:RPC 通道无法解析响应标头。@未知

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, 箭头::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 未知 @ 未知

@   Unknown
@   Unknown
Unknown
@   Unknown binarizing process filepath: /parquet_430mb/5e6.parquet
@   Unknown
Unknown
@   Unknown

@   Unknown


@   Unknown

2018-01-09 21:41:47.939854,p10010,th139965275871040,INFO 在服务器“192.168.0.101:9000”上重试幂等 RPC 调用“getFileInfo”

2018-01-09 21:41:47.939864,P10013,TH139965275871040,INFO INFO ETRY retry IDEMPOTENT RPC调用“ GetFileInfo”服务器“ ON SERVER” ON SERVER“ 192.168.0.0.0.101:9000服务器“192.168.0.101:9000”上的 RPC 调用“getFileInfo” 2018-01-09 21:41:47.939868,p10012,th139965275871040,INFO 重试服务器“192.168.0.101:9000”上的幂等 RPC 调用“getFileInfo”201-8 09 21:41:47.939868, p10009, th139965275871040, INFO 在服务器 "192.168.0.101:9000" 上重试幂等 RPC 调用 "getFileInfo" 2018-01-09 21:41:47.940813, p10014, th10014, th143996 调用 RPC 失败 "192.168.0.101:9000"服务器“192.168.0.101:9000”上的“getFileInfo”:RpcChannel.cpp:780:HdfsRpcException:RPC 通道到“192.168.0.101:9000" 得到协议不匹配:RPC 通道无法解析响应头。@未知

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, 箭头::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 未知

@   Unknown

2018-01-09 21:41:47.940937,P10014,TH139965275871040,INFO INFORETRY IDEMPOTENT RPC CALL“ GetFileInfo” Server“ On Server” On on Server“ 192.168.0.0.0.101:9000在服务器“192.168.0.101:9000”上调用 RPC 调用“getFileInfo”:RpcChannel.cpp:393:HdfsRpcException:无法在服务器“192.168.0.101:9000”上调用 RPC 调用“getFileInfo”@Unknown@Unknown

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, 箭头::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 未知

@   Unknown Caused by TcpSocket.cpp: 127: HdfsNetworkException: Write 124 bytes failed to "192.168.0.101:9000": (errno: 32) Broken

管道@未知@未知

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,

_object*, 箭头::io::HdfsPathInfo*) @ __pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*, _object*) @ 未知 @ 未知

@   Unknown

2018-01-09 21:41:47.944519, p10011, th139965275871040, INFO 在服务器 "192.168.0.101:9000" 上重试幂等 RPC 调用 "getFileInfo" ------------------ -------------------------------------------------- ------- ArrowIOError Traceback(最近一次调用最后一次)

/home/parquet_sender.pyc in insert_files_parallel(self) 374 # print ('372 sqparquet filepath:', filepath) 375 params_with_path_and_mode = [col_params+(filepath, native) for col_params in pool_params] --> 376 bin_col = self.pool.map (read_binarize, params_with_path_and_mode) 377 got ('map complete') 378 num_rows = bin_col[0][2]

/usr/lib/python2.7/multiprocessing/pool.pyc in map(self, func, iterable, chunksize) 249 ''' 250 assert self._state == RUN --> 251 return self.map_async(func, iterable, chunksize).get() 252 253 def imap(self, func, iterable, chunksize=1):

/usr/lib/python2.7/multiprocessing/pool.pyc in get(self, timeout) 556 return self._value 557 else: --> 558 raise self._value 559 560 def _set(self, i, obj):

ArrowIOError:HDFS:GetPathInfo 失败

我很高兴收到有关此错误原因的任何反馈,以及我应该如何使用并行镶木地板加载。

4

1 回答 1

14

这是一个与多处理序列化细节相关的错误。我在这里打开了一个错误报告https://issues.apache.org/jira/browse/ARROW-1986

于 2018-01-11T01:19:30.000 回答