首先,让我们把 JSON 的东西排除在外。
您的文件不是 JSON 结构,而是一堆单独的 JSON 对象。从您的示例中,它看起来像是每行一个对象。因此,让我们将其读入列表:
with open('spam.json') as f:
things = [json.loads(line) for line in f]
然后我们将处理它,并把它写出来:
with open('eggs.json', 'w') as f:
for thing in new_things:
f.write(json.dumps(thing) + '\n')
现在,您没有想要附加内容的 JSON 结构;你有一个字典列表,你想创建一个新的字典列表,将具有相同键的字典合并在一起。
这是一种方法:
new_things = {}
for thing in things:
thing_id = thing['id']
try:
old_thing = new_things[thing_id]
except KeyError:
new_things[thing_id] = thing
else:
old_thing['ua'].extend(thing['ua'])
new_things = new_things.values()
有几种不同的方法可以简化这一点;我之所以这样写,是因为它没有使用新手应该掌握的技巧。例如,您可以通过它进行排序和分组:
def merge(things):
return {'id': things[0]['id'],
'ua': list(itertools.chain.from_iterable(t['ua'] for t in things))}
sorted_things = sorted(things, key=operator.itemgetter('id'))
grouped_things = itertools.groupby(sorted_things, key=operator.itemgetter('id'))
new_things = [merge(list(group)) for key, group in grouped_things]
我没有从您最初的问题中意识到您有数千万行。上述所有步骤都需要将整个原始数据集加载到内存中,使用一些临时存储进行处理,然后将其写回。但是,如果您的数据集太大,您需要找到一种方法一次处理一行,同时尽可能少地保留在内存中。
首先,要一次处理一行,只需将初始列表推导更改为生成器表达式,然后将其余代码移动到with
语句中,如下所示:
with open('spam.json') as f:
things = (json.loads(line) for line in f)
for thing in things:
# blah blah
…在这一点上,像这样重写它可能同样容易:
with open('spam.json') as f:
for line in f:
thing = json.loads(line)
# blah blah
接下来,排序显然会在内存中构建整个排序列表,所以在这里这是不可接受的。但是,如果您不排序和分组,则整个new_things
结果对象必须同时处于活动状态(因为最后一个输入行可能必须合并到第一个输出行中)。
您的示例数据似乎已经具有按 排序的行id
。如果您可以在现实生活中依靠它——或者只依靠总是被分组的行——id
只需跳过排序步骤,这只会浪费时间和内存,并使用分组解决方案。
另一方面,如果您不能指望按 分组的行id
,那么实际上只有两种方法可以进一步减少内存:以某种方式压缩数据,或者将存储返回到磁盘。
首先,Foo Bar User 的解决方案构建了一个更简单、更小的数据结构(一个 dict 将每个 id 映射到它的 uas 列表,而不是一个 dicts 列表,每个都有一个 id 和一个 ua),它应该占用更少的内存,并且我们可以一次将其转换为最终格式。像这样:
with open('spam.json') as f:
new_dict = defaultdict(list)
for row in f:
thing = json.loads(row)
new_dict[thing["id"]].extend(thing["ua"])
with open('eggs.json', 'w') as f:
for id, ua in new_dict.items(): # use iteritems in Python 2.x
thing = {'id': id, 'ua': ua}
f.write(json.dumps(thing) + '\n')
其次,Python 提供了一种很好的方式来使用 dbm 数据库,就好像它是一个字典一样。如果您的值只是字符串,则可以使用anydbm
/dbm
模块(或特定实现之一)。由于您的值是列表,因此您需要shelve
改用。
无论如何,虽然这会减少您的内存使用量,但它可能会减慢速度。在具有 4GB RAM 的机器上,页面文件交换的节省可能会消除通过数据库的额外成本……但是在具有 16GB RAM 的机器上,您可能只是增加了开销而收效甚微。shelve
您可能想先尝试使用较小的文件,看看您的代码与dict
内存不是问题时相比要慢多少。
或者,如果事情超出了您的内存限制,您总是可以使用更强大的数据库,它实际上可以对磁盘上的内容进行排序。例如(未经测试):
db = sqlite3.connect('temp.sqlite')
c = db.cursor()
c.execute('CREATE TABLE Things (tid, ua)')
for thing in things:
for ua in thing['ua']:
c.execute('INSERT INTO Things (tid, ua) VALUES (?, ?)',
thing['id'], ua)
c.commit()
c.execute('SELECT tid, ua FROM Things ORDER BY tid')
rows = iter(c.fetchone, None)
grouped_things = itertools.groupby(rows, key=operator.itemgetter(0))
new_things = (merge(list(group)) for key, group in grouped_things)
with open('eggs.json', 'w') as f:
for thing in new_things:
f.write(json.dumps(thing) + '\n')