-1

我需要从 pyspark worker中读取/扫描/写入文件到 hdfs 。

请注意,以下 api 不适用,因为它们会脱离驱动程序

sc.textFile()
sc.saveAsParquetFile()

ETC

最好不要涉及额外的第三方库(例如 pyhadoop)。

一种选择是掏出例如

 os.system('hdfs dfs -ls %(hdfsPath)s' %locals())

但是有没有更原生的 pyspark 方法来实现这一点?

更新这不是广播数据的情况,因为每个工作人员将从 hdfs 读取不同的数据。其中一个用例是在每个工作人员中读取一些大型二进制文件(这显然不是广播的情况)。另一种情况是读取包含指令的“命令”文件。我已经在原生 hadoop 和 scala spark 中成功使用了这种模式。

4

2 回答 2

2

解决方案似乎是子进程(没有直接的python访问)。将接受的答案和来自以下的评论之一拼凑在一起: Python 从 HDFS 读取文件作为流

cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)
for line in iter(cat.stdout.readline, ''): 
    print line,   # include the comma
于 2015-03-12T20:19:08.227 回答
-1

更本地的 PySpark 方法是使用sc.textFile()或其他读取方法读取驱动程序中的数据,并将其作为 RDD 或广播变量传递给工作人员,如果它足够小以适合每个执行程序的内存。

你能描述一下你的情况吗,我怀疑你真的需要阅读工人中的文件

更新:

简短的摘要:

  1. 直接从大型集群上的工作人员读取一组文件可能会杀死名称节点
  2. 在大多数情况下,不需要直接从工作人员那里读取单独的文件。您可以只为textFile()方法或使用通配符的文件集wholeTextFiles()binaryFiles()方法来读取文件集及其名称
  3. 在具有千兆字节图像的图像处理的特定情况下,只需将它们放入序列文件并使用sequenceFile()方法读取它
  4. 使用 Python 直接从 HSFS 读取而无需额外的库可以通过直接查询 WebHDFS REST API 来实现,考虑到这正是库所实现的,这是一种矫枉过正的做法。另一种选择可能是使用pipe()Spark 方法调用 Java 程序读取 HDFS 文件并将它们以序列化形式返回到标准输出。另一种选择是通过转义到 shell 将文件从 HDFS 复制到临时空间,然后使用标准读取文件功能读取该文件。就我个人而言,我会解雇我的开发人员来实施我在这里提出的任何方法
于 2015-03-12T13:17:01.290 回答