29

这是我的问题:我在 HDFS 中有一个文件可能很大(= 不足以容纳所有内存)

我想做的是避免将此文件缓存在内存中,并且只像处理常规文件一样逐行处理它:

for line in open("myfile", "r"):
    # do some processing

我正在寻找是否有一种简单的方法可以在不使用外部库的情况下正确完成这项工作。我可能可以使它与libpyhdfspython-hdfs一起使用,但如果可能的话,我希望避免在系统中引入新的依赖项和未经测试的库,特别是因为这两个似乎都没有得到大量维护并且声明它们不应该是用于生产。

我正在考虑使用使用 Pythonsubprocess模块的标准“hadoop”命令行工具来做到这一点,但我似乎无法做我需要的事情,因为没有命令行工具可以做我的处理,我想以流方式为每一行执行 Python 函数。

有没有办法使用 subprocess 模块将 Python 函数应用为管道的正确操作数?或者更好的是,像文件一样打开它作为生成器,这样我就可以轻松处理每一行?

cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)

如果有另一种方法可以在不使用外部库的情况下实现我上面描述的内容,我也很开放。

谢谢你的帮助 !

4

4 回答 4

44

你想要xreadlines,它从文件中读取行而不将整个文件加载到内存中。

编辑

现在我看到了你的问题,你只需要从你的Popen对象中获取标准输出管道:

cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)
for line in cat.stdout:
    print line
于 2012-09-18T22:04:22.797 回答
32

如果您想不惜一切代价避免添加外部依赖项,Keith 的答案就是要走的路。 另一方面,Pydoop可以让你的生活更轻松:

import pydoop.hdfs as hdfs
with hdfs.open('/user/myuser/filename') as f:
    for line in f:
        do_something(line)

关于您的担忧,Pydoop 正在积极开发中,并已在CRS4用于生产多年,主要用于计算生物学应用。

西蒙娜

于 2013-01-15T10:15:33.553 回答
1

在过去的两年里,Hadoop-Streaming 发生了很多变化。根据 Cloudera,这非常快:http: //blog.cloudera.com/blog/2013/01/a-guide-to-python-frameworks-for-hadoop/ 我已经取得了很好的成功。

于 2014-10-03T16:00:39.137 回答
0

您可以使用 WebHDFS Python 库(建立在 urllib3 之上):

from hdfs import InsecureClient
client_hdfs = InsecureClient('http://host:port', user='root')
with client_hdfs.write(access_path) as writer:
    dump(records, writer)  # tested for pickle and json (doesnt work for joblib)

或者你可以在 python 中使用 requests 包:

import requests
from json import dumps
params = (('op', 'CREATE')
('buffersize', 256))
data = dumps(file)  # some file or object - also tested for pickle library
response = requests.put('http://host:port/path', params=params, data=data)  # response 200 = successful

希望这可以帮助!

于 2020-02-09T16:57:36.500 回答