我有一个 status_changes 的日志文件,每个文件都有一个 driver_id、时间戳和持续时间。使用 driver_id 和时间戳,我想从 S3 获取适当的 GPS 日志。这些 GPS 日志以 bucket_name/yyyy/mm/dd/driver_id.log 形式存储在 S3 存储桶中。
from mrjob.job import MRJob
class Mileage(MRJob):
def get_s3_gpslog_path(self, driver_id, occurred_at, status):
s3_path = "s3://gps_logs/{yyyy}/{mm}/{dd}/{driver_id}.log"
s3_path = s3_path.format(yyyy=occurred_at.year,
mm=occurred_at.month,
dd=occurred_at.day,
driver_id=driver_id)
return s3_path
def mapper(self, _, line):
line = ast.literal_eval(line)
driver_id = line['driverId']
occurred_at = line['timestamp']
status = line['status']
s3_path = self.get_s3_gpslog_path(driver_id, occurred_at, status)
# ^^ How do I fetch this file and read it?
distance = calculate_distance_from_gps_log(s3_path, occurred_at, status)
yield status, distance
if __name__ == '__main__':
Mileage.run()
在命令行中,我使用 status_change 日志文件作为输入运行它: $ python mileage.py status_changes.log
我的问题是:给定我构建的 S3 URI 字符串,我如何实际获取该 GPS 日志?