我想读取一个 28Gb 的 csv 文件并打印内容。但是,我的代码:
import json
import sys
from datetime import datetime
from hashlib import md5
import dask.dataframe as dd
import dask.multiprocessing
import pandas as pd
from kyotocabinet import *
class IndexInKyoto:
def hash_string(self, string):
return md5(string.encode('utf-8')).hexdigest()
def dbproc(self, db):
db[self.hash_string(self.row)] = self.row
def index_row(self, row):
self.row = row
DB.process(self.dbproc, "index.kch")
start_time = datetime.utcnow()
row_counter = 0
ob = IndexInKyoto()
df = dd.read_csv("/Users/aviralsrivastava/dev/levelsdb-learning/10gb.csv", blocksize=1000000)
df = df.compute(scheduler='processes') # convert to pandas
df = df.to_dict(orient='records')
for row in df:
ob.index_row(row)
print("Total time:")
print(datetime.utcnow-start_time)
不管用。当我运行命令时,htop
我可以看到 dask 正在运行,但没有任何输出。也没有创建任何 index.kch 文件。我在不使用 dask 的情况下咆哮同样的事情,它运行良好;我正在使用 Pandas 流 api ( chunksize
) 但它太慢了,因此我想使用 dask。