5

我想在 PySpark 中有效地将 numpy 数组从工作机器(函数)保存/读取到 HDFS。我有两台机器A和B。A有主人和工人。B 有一名工人。例如,我想实现以下目标:

if __name__ == "__main__":
    conf = SparkConf().setMaster("local").setAppName("Test")
    sc = SparkContext(conf = conf)
    sc.parallelize([0,1,2,3], 2).foreachPartition(func)

def func(iterator):
    P = << LOAD from HDFS or Shared Memory as numpy array>>
    for x in iterator:
        P = P + x

    << SAVE P (numpy array) to HDFS/ shared file system >>

什么是快速有效的方法呢?

4

1 回答 1

1

我偶然发现了同样的问题。最后使用HdfsCli 模块和带有 Python3.4 的临时文件使用了一种解决方法。

  1. 进口:
from hdfs import InsecureClient
from tempfile import TemporaryFile
  1. 创建一个 hdfs 客户端。在大多数情况下,最好在脚本中的某处使用实用程序函数,例如:
def get_hdfs_client():
    return InsecureClient("<your webhdfs uri>", user="<hdfs user>",
         root="<hdfs base path>")
  1. 在工作函数中加载并保存您的 numpy:
hdfs_client = get_hdfs_client()

# load from file.npy
path = "/whatever/hdfs/file.npy"
tf = TemporaryFile()

with hdfs_client.read(path) as reader:
    tf.write(reader.read())
    tf.seek(0) # important, set cursor to beginning of file

np_array = numpy.load(tf)

...

# save to file.npy
tf = TemporaryFile()
numpy.save(tf, np_array)
tf.seek(0) # important ! set the cursor to the beginning of the file
# with overwrite=False, an exception is thrown if the file already exists
hdfs_client.write("/whatever/output/file.npy", tf.read(),  overwrite=True) 

笔记:

  • 用于创建 hdfs 客户端的 uri 以 开头http://,因为它使用 hdfs 文件系统的 Web 界面;
  • 确保您传递给 hdfs 客户端的用户具有读写权限
  • 以我的经验,开销并不大(至少在执行时间方面)
  • 使用临时文件(与 中的常规文件相比/tmp)的优点是确保脚本结束后没有垃圾文件留在集群机器中,无论是否正常
于 2016-05-24T08:49:34.650 回答