1

我想使用 mrjob(使用 EMR)在 S3 中高效地处理大量数据。我可以以任何我想要的方式构建数据,但显然我想尽我所能发挥 EMR 在 S3 数据上运行的优势。

我的数据包含数百万个网页(假设每个 50K)。直观地说,创建一组 .tar.gz 文件(简称 .tgz)对我来说是有意义的,每个文件都有数千页,这样 .tgz 文件大小约为 2GB 左右。然后我想将这些 .tgz 文件加载到 S3 上并编写一个 mrjob 任务来处理这些(例如,10 个 EC2 实例)。

我被构建这些 .tgz 文件所吸引,因为它们代表了一种非常压缩的数据形式,因此它们应该最大限度地减少网络流量(大小以及传输延迟)。我也喜欢构建多个 .tgz 文件,因为我显然想利用我计划为这项工作分配的多个 EMR 实例。

如果必须,我可以连接文件,这样我就可以避免归档 (tar) 步骤而只处理 .gz 文件,但是将原始数据压缩然后压缩会更容易。

我是否以正确的方式考虑这个问题,如果是这样,我如何配置/指定 mrjob 以解压缩和解压缩,以便实例仅处理其中一个 .tgz 文件的全部内容?

4

1 回答 1

0

我强烈劝阻您不要将所描述的方法与 EMR 配对,因为它显然违反了 map-reduce 范式。您可能会克服网络流量瓶颈(如果有的话),但会遇到负载平衡映射器的困难。Map-reduce 方法是记录处理,当记录数量无限增长时可以很好地扩展。话虽如此,您的架构是可行的,但更好地与不同的任务处理工具一起使用,例如celery

如果您希望在 map-reduce 范例中处理您的数据(使用 EMR 和 mrjob),例如,您可以压缩和base64编码每个页面,以确保将页面存储为文本文件中的一行。有关工作示例,请参阅以下 mrjob 兼容协议:

import cPickle
import zlib
import base64

from mrjob import protocol

class CompressedPickleProtocol(protocol.PickleProtocol):
    """
    Protocol that compresses pickled `key` and `value` with 
    `zlib.compress` and encodes result with `base64.b64encode`.
    """

    @classmethod
    def encode(cls, value):
        return base64.b64encode(value)

    @classmethod
    def decode(cls, raw_value):
        return base64.b64decode(raw_value)

    @classmethod
    def _writes(cls, value):
        return cls.encode(zlib.compress(cPickle.dumps(value)))

    @classmethod
    def _reads(cls, raw_value):
        return cPickle.loads(zlib.decompress(cls.decode(raw_value)))

    @classmethod
    def read(cls, line):
        raw_key, raw_value = line.split('\t', 1)
        return (cls._reads(raw_key), cls._reads(raw_value))

    @classmethod
    def write(cls, key, value):
        return '%s\t%s' % (cls._writes(key),
                           cls._writes(value))
于 2014-08-10T11:06:56.273 回答