0

我有一个 status_changes 的日志文件,每个文件都有一个 driver_id、时间戳和持续时间。使用 driver_id 和时间戳,我想从 S3 获取适当的 GPS 日志。这些 GPS 日志以 bucket_name/yyyy/mm/dd/driver_id.log 形式存储在 S3 存储桶中。

from mrjob.job import MRJob
class Mileage(MRJob):

    def get_s3_gpslog_path(self, driver_id, occurred_at, status):
        s3_path = "s3://gps_logs/{yyyy}/{mm}/{dd}/{driver_id}.log"
        s3_path = s3_path.format(yyyy=occurred_at.year,
                                 mm=occurred_at.month,
                                 dd=occurred_at.day,
                                 driver_id=driver_id)
        return s3_path

    def mapper(self, _, line):
        line = ast.literal_eval(line)
        driver_id = line['driverId']
        occurred_at = line['timestamp']
        status = line['status']
        s3_path = self.get_s3_gpslog_path(driver_id, occurred_at, status)
        # ^^ How do I fetch this file and read it?
        distance = calculate_distance_from_gps_log(s3_path, occurred_at, status)

        yield status, distance


if __name__ == '__main__':
    Mileage.run()

在命令行中,我使用 status_change 日志文件作为输入运行它: $ python mileage.py status_changes.log

我的问题是:给定我构建的 S3 URI 字符串,我如何实际获取该 GPS 日志?

4

1 回答 1

0

您可以使用作为 mrjob一部分的 S3Filesystem。您还可以在脚本中使用boto的 S3 实用程序。您可能需要硬编码(或从每个节点中的 Hadoop 配置中解析)(秘密)访问密钥。但是,映射器正在做的事情可能有点过多,可能会发出过多的请求以获取 S3 资源。您可以重写 MapReduce 算法,通过在 GPS 日志和其他日志中进行流式传输,以资源较少的方式进行连接。

于 2014-04-25T02:54:00.650 回答