1

我尝试使用多处理模块中的 Pool 来加快读取大型 csv 文件的速度。为此,我改编了一个示例(来自 py2k),但似乎 csv.dictreader 对象没有长度。这是否意味着我只能迭代它?有没有办法把它分块?

这些问题似乎相关,但并没有真正回答我的问题: csv.DictReader 中的行数如何在 Python 3 中分块列表?

我的代码试图这样做:

source = open('/scratch/data.txt','r')
def csv2nodes(r):
    strptime = time.strptime
    mktime = time.mktime
    l = []
    ppl = set()
    for row in r:
        cell = int(row['cell'])
        id = int(row['seq_ei'])
        st = mktime(strptime(row['dat_deb_occupation'],'%d/%m/%Y'))
        ed = mktime(strptime(row['dat_fin_occupation'],'%d/%m/%Y'))
        # collect list
        l.append([(id,cell,{1:st,2: ed})])
        # collect separate sets
        ppl.add(id)
    return (l,ppl)


def csv2graph(source):
    r = csv.DictReader(source,delimiter=',')
    MG=nx.MultiGraph()
    l = []
    ppl = set()
    # Remember that I use integers for edge attributes, to save space! Dic above.
    # start: 1
    # end: 2
    p = Pool(processes=4)
    node_divisor = len(p._pool)*4
    node_chunks = list(chunks(r,int(len(r)/int(node_divisor))))
    num_chunks = len(node_chunks)
    pedgelists = p.map(csv2nodes,
                       zip(node_chunks))
    ll = []
    for l in pedgelists:
        ll.append(l[0])
        ppl.update(l[1])
    MG.add_edges_from(ll)
    return (MG,ppl)
4

1 回答 1

1

csv.DictReader文档(及其csv.reader子类)中,该类返回一个迭代器。当TypeError您调用len().

您仍然可以将数据分块,但您必须将其完全读入内存。如果您担心内存,您可以切换csv.DictReadercsv.reader并跳过字典csv.DictReader创建的开销。为了提高 中的可读性csv2nodes(),您可以分配常量来处理每个字段的索引:

CELL = 0
SEQ_EI = 1
DAT_DEB_OCCUPATION = 4
DAT_FIN_OCCUPATION = 5

我还建议使用与 不同的变量id,因为这是一个内置函数名。

于 2011-11-08T16:30:49.207 回答