1

在 python 社区的帮助下,我开始学习 python 来处理大约 5 亿(40G)的数据并编写了以下脚本。

输入文件格式-

Studentid,Subject,DateTime,Grade
001,Biology,Mon Apr 25 19:32:00 PDT 2013,B
001,Literature,Wed Apr 10 15:31:00 PST 2013,B
001,Math,Mon Apr 22 01:32:00 PDT 2013,A
002,Biology,Mon Apr 25 19:32:00 PDT 2013,A
002,Math,Mon Apr 22 16:31:14 PDT 2013,C
002,Math,Wed Apr 10 15:31:00 PST 2013,C
003,Biology,Mon Apr 22 13:31:00 PDT 2013,A
003,Irdu,Wed Apr 10 15:31:00 PST 2013,A

输出报告

003,Irdu;Wed Apr 10 15:31:00 PST 2013;A#Biology;Mon Apr 22 13:31:00 PDT 2013;A
002,Math;Wed Apr 10 15:31:00 PST 2013;C#Math;Mon Apr 22 16:31:14 PDT 2013;C#Biology;Mon Apr 25 19:32:00 PDT 2013;A
001,Literature;Wed Apr 10 15:31:00 PST 2013;B#Math;Mon Apr 22 01:32:00 PDT 2013;A#Biology;Mon Apr 25 19:32:00 PDT 2013;B

Python 脚本

import csv
import time
import operator
import sys, getopt
import os

from collections import defaultdict
from datetime import datetime
from operator import itemgetter

start = time.time()
def elapsed():
    return time.time() - start

def date_key(row):
  try:
    formatRow = row[1].replace('PDT ','')
    formatRow = formatRow.replace('PST ','')
    return datetime.strptime(formatRow, "%a %b %d %X %Y")
  except Exception, e:
     print ("Error in sorting the date: %s \nRow : %s" % (e, row))
     pass

def processRecords(accountsData, fileName):
  for v in accountsData.itervalues():
    try:
      v.sort(key=date_key)
    except Exception, e:
      pass

  with open(fileName, 'a') as writer:
    for pid,v in accountsData.iteritems():
      csv = '#'.join([';'.join(t) for t in v])
      writer.write("%s,%s\n" % (pid, csv))

def main(argv):
  inputFile = ''
  outputFile = ''
  batchsize = 20000000
  try:
    opts, args = getopt.getopt(argv,"hi:o:b:",["ifile=","ofile=","bsize="])
  except getopt.GetoptError:
    print 'ReportToFileBatches.py -i <inputfile> -o <outputfile> -b<batchsize>[default=20000000]'
    sys.exit(2)
  for opt, arg in opts:
    if opt == '-h':
      print 'ReportToFileBatches.py -i <inputfile> -o <outputfile> -b<batchsize>[default=20000000]'
      sys.exit()
    elif opt in ("-i", "--ifile"):
      inputFile = arg
    elif opt in ("-o", "--ofile"):
      outputFile = arg
    elif opt in ("-b", "--bsize"):
      batchsize = int(arg)

  if not (os.path.isfile(inputFile)):
    print ("\nError : File - %s does not exist." % (inputFile)) 
    sys.exit(2)

  #print "Batch Size %s " % batchsize
  linenumb = 0  
  with open(inputFile,'r') as data:
    accounts = defaultdict(list)

    for line in data:
      linenumb = linenumb + 1
      line = line.rstrip('\r\n')
      try:
        sid, subject, datetime, grade = line.split(',')
        accounts[sid].append((subject, datetime, grade))

        if (linenumb == batchsize):
          linenumb = 0
          processRecords(accounts, outputFile)
          accounts = defaultdict(list)
        else: continue
      except Exception, e:
        print ("Error : %s \nRow : %s" % (e, line))  

  if(linenumb > 0):
    processRecords(accounts, outputFile)

  print("Total time taken - %.3fs" % elapsed())

if __name__ == "__main__":
   main(sys.argv[1:])

您可以看到输出文件(报告)是按日期排序的,也可以是字段的连接。我花更多时间对日期时间列进行排序(也许)。我是 Python 的新手。我非常感谢在改进我的脚本以减少处理时间方面的任何帮助。希望我说得通。

仅供参考:我确保输入文件按 studentid 排序并分批处理。

4

2 回答 2

1

我什至无法想象想要通过任何数量的分析或算法优化来做到这一点。

这让我觉得是一个数据库问题。

  1. 将数据加载到数据库中(sqlite 带有 python)
  2. 添加日期索引
  3. 转储你的输出

python 是进行格式化和解析的胶水语言。不是滚动您自己管理 40G 数据的语言。

于 2013-05-28T19:38:39.207 回答
0

我认为你能做的最好的事情就是鸽巢式。如果您知道时间数据的范围和分布,即最大和最小时间,则此方法有效。分类通过将您的数据分离到离散的分类中来工作。例如,如果您的数据大约在一个月内分布,那么您可以有一个数组,其中每个索引都是一个数组,表示该月每一天的 1 小时。您遍历列表,将数据放入该数组中。当您将数据放入该数组时,您可以将其排序到该子列表中。

如果您正确选择鸽子,此算法将非常有效。例如,如果你的所有数据都在同一天,而你的鸽子洞是白天,那么这个算法的性能并不比你的好。另一个注意事项是你的鸽子不必均匀分布。如果几乎所有数据都在不同日期的同一小时内,则在该小时内添加较小的时间步长,在其余时间添加较大的时间步长。

计数排序是另一种选择(也有更好的运行时间),但由于您的文件太大,我认为您可能会遇到内存问题或因不断的磁盘写入而滞后。值得研究,但要小心这些延误。

编辑:刚刚意识到您的文件有 40g 大,因此要解决该内存问题,而不是数组数组,您可以拥有一系列具有已知名称的文件,其中包含您的分类列表。

于 2013-05-28T19:08:48.087 回答