我有数百万个特定类型的实体,我想导出到 csv 文件。以下代码将实体以 1000 个为单位批量写入 blob,同时保持 blob 打开并将下一批实体推迟到任务队列。当没有更多实体要获取时,blob 就完成了。这似乎适用于我的大部分本地测试,但我想知道:
如果我在我的生产数据上运行它之前错过了任何陷阱或极端情况,并为数据存储读取产生了 $s。
如果超过最后期限或在批处理写入 blob 时内存耗尽,则此代码默认为当前批处理的开始以再次运行任务,这可能会导致大量重复。有什么建议可以解决这个问题吗?
def entities_to_csv(entity_type,blob_file_name='',cursor='',batch_size=1000):
more = True
next_curs = None
q = entity_type.query()
results,next_curs,more = q.fetch_page(batch_size,start_cursor=Cursor.from_websafe_string(cursor))
if results:
try:
if not blob_file_name:
blob_file_name = files.blobstore.create(mime_type='text/csv',_blob_uploaded_filename='%s.csv' % entity_type.__name__)
rows = [e.to_dict() for e in results]
with files.open(blob_file_name, 'a') as f:
writer = csv.DictWriter(f,restval='',extrasaction='ignore',fieldnames=results[0].keys())
writer.writerows(rows)
if more:
deferred.defer(entity_type,blob_file_name,next_curs.to_websafe_string())
else:
files.finalize(blob_file_name)
except DeadlineExceededError:
deferred.defer(entity_type,blob_file_name,cursor)
稍后在代码中,类似于:
deferred.defer(entities_to_csv,Song)