我已经将 Dask 用于大型遥测 JSON-Lines 文件(换行符分隔)......
Dask 的好处是它为您做了很多工作。
有了它,您可以读取数据、处理数据并写入磁盘,而无需将其全部读入内存。
Dask 还将为您并行化并使用多个内核(线程)...
有关 Dask 袋的更多信息:
https ://examples.dask.org/bag.html
import ujson as json #ujson for speed and handling NaNs which are not covered by JSON spec
import dask.bag as db
def update_dict(d):
d.update({'new_key':'new_value', 'a':1, 'b':2, 'c':0})
d['c'] = d['a'] + d['b']
return d
def read_jsonl(filepaths):
"""Read's a JSON-L file with a Dask Bag
:param filepaths: list of filepath strings OR a string with wildcard
:returns: a dask bag of dictionaries, each dict a JSON object
"""
return db.read_text(filepaths).map(json.loads)
filepaths = ['file1.jsonl.gz','file2.jsonl.gz']
#OR
filepaths = 'file*.jsonl.gz' #wildcard to match multiple files
#(optional) if you want Dask to use multiple processes instead of threads
# from dask.distributed import Client, progress
# client = Client(threads_per_worker=1, n_workers=6) #6 workers for 6 cores
# print(client)
#define bag containing our data with the JSON parser
dask_bag = read_jsonl(filepaths)
#modify our data
#note, this doesn't execute, it just adds it to a queue of tasks
dask_bag.map(update_dict)
#(optional) if you're only reading one huge file but want to split the data into multiple files you can use repartition on the bag
# dask_bag = dask_bag.repartition(10)
#write our modified data back to disk, this is when Dask actually performs execution
dask_bag.map(json.dumps).to_textfiles('file_mod*.jsonl.gz') #dask will automatically apply compression if you use .gz