4

我有一个非常大的 CSV 数据,对于列 2 中规定的当前日期之前的日期,我需要将先前的数据附加到第 2 列中每个名称的每一行。我认为表示这个问题的最简单方法是提供一个与我的真实数据相似的详细示例,但显着缩小:

Datatitle,Date,Name,Score,Parameter
data,01/09/13,george,219,dataa,text
data,01/09/13,fred,219,datab,text
data,01/09/13,tom,219,datac,text
data,02/09/13,george,229,datad,text
data,02/09/13,fred,239,datae,text
data,02/09/13,tom,219,dataf,text
data,03/09/13,george,209,datag,text
data,03/09/13,fred,217,datah,text
data,03/09/13,tom,213,datai,text
data,04/09/13,george,219,dataj,text
data,04/09/13,fred,212,datak,text
data,04/09/13,tom,222,datal,text
data,05/09/13,george,319,datam,text
data,05/09/13,fred,225,datan,text
data,05/09/13,tom,220,datao,text
data,06/09/13,george,202,datap,text
data,06/09/13,fred,226,dataq,text
data,06/09/13,tom,223,datar,text
data,06/09/13,george,219,dataae,text

所以对于这个 csv 的前三行,没有以前的数据。因此,如果我们说我们想在当前日期之前的日期为乔治 (row1) 的最后 3 次出现拉取第 3 列和第 4 列,它将产生:

data,01/09/13,george,219,dataa,text,x,y,x,y,x,y

然而,当以前的数据开始变得可用时,我们希望生成一个像这样的 csv:

Datatitle,Date,Name,Score,Parameter,LTscore,LTParameter,LTscore+1,LTParameter+1,LTscore+2,LTParameter+3,
data,01/09/13,george,219,dataa,text,x,y,x,y,x,y
data,01/09/13,fred,219,datab,text,x,y,x,y,x,y
data,01/09/13,tom,219,datac,text,x,y,x,y,x,y
data,02/09/13,george,229,datad,text,219,dataa,x,y,x,y
data,02/09/13,fred,239,datae,text,219,datab,x,y,x,y
data,02/09/13,tom,219,dataf,text,219,datac,x,y,x,y
data,03/09/13,george,209,datag,text,229,datad,219,dataa,x,y
data,03/09/13,fred,217,datah,text,239,datae,219,datab,x,y
data,03/09/13,tom,213,datai,text,219,dataf,219,datac,x,y
data,04/09/13,george,219,dataj,text,209,datag,229,datad,219,dataa
data,04/09/13,fred,212,datak,text,217,datah,239,datae,219,datab
data,04/09/13,tom,222,datal,text,213,datai,219,dataf,219,datac
data,05/09/13,george,319,datam,text,219,dataj,209,datag,229,datad
data,05/09/13,fred,225,datan,text,212,datak,217,datah,239,datae
data,05/09/13,tom,220,datao,text,222,datal,213,datai,219,dataf
data,06/09/13,george,202,datap,text,319,datam,219,dataj,209,datag
data,06/09/13,fred,226,dataq,text,225,datan,212,datak,217,datah
data,06/09/13,tom,223,datar,text,220,datao,222,datal,213,datai
data,06/09/13,george,219,datas,text,319,datam,219,dataj,209,datag

您会注意到 06/09/13 乔治出现了两次,并且两次319,datam,219,dataj,209,datag他的行都附加了相同的字符串。乔治第二次出现,他附加了这个相同的字符串,因为上面的乔治 3 行是在同一日期。(这只是强调“在当前日期之前的日期”。

从列标题可以看出,我们正在收集最后 3 个分数和相关的 3 个参数,并将它们附加到每一行。请注意,这是一个非常简化的示例。实际上,每个日期将包含几千行,在真实数据中,名称也没有模式,所以我们不希望看到 fred、tom、george 以重复模式彼此相邻。如果有人可以帮助我弄清楚如何最好地实现这一目标(最有效),我将非常感激。如果有什么不清楚的地方请告诉我,我会补充更多细节。任何建设性意见表示赞赏。非常感谢

4

5 回答 5

11

看来您的文件按日期顺序排列。如果我们在每个日期取每个名称的最后一个条目,并将其添加到每个名称的大小双端队列中,同时写出每一行,那么应该可以解决问题:

import csv
from collections import deque, defaultdict
from itertools import chain, islice, groupby
from operator import itemgetter

# defaultdict whose first access of a key will create a deque of size 3
# defaulting to [['x', 'y'], ['x', 'y'], ['x' ,'y']]
# Since deques are efficient at head/tail manipulation, then an insert to
# the start is efficient, and when the size is fixed it will cause extra
# elements to "fall off" the end... 
names_previous = defaultdict(lambda: deque([['x', 'y']] * 3, 3))
with open('sample.csv', 'rb') as fin, open('sample_new.csv', 'wb') as fout:
    csvin = csv.reader(fin)
    csvout = csv.writer(fout)
    # Use groupby to detect changes in the date column. Since the data is always
    # asending, the items within the same data are contigious in the data. We use
    # this to identify the rows within the *same* date.
    # date=date we're looking at, rows=an iterable of rows that are in that date...
    for date, rows in groupby(islice(csvin, 1, None), itemgetter(1)):
        # After we've processed entries in this date, we need to know what items of data should
        # be considered for the names we've seen inside this date. Currently the data
        # is taken from the last occurring row for the name.
        to_add = {}
        for row in rows:
            # Output the row present in the file with a *flattened* version of the extra data
            # (previous items) that we wish to apply. eg:
            # [['x, 'y'], ['x', 'y'], ['x', 'y']] becomes ['x', 'y', 'x', 'y', 'x', y'] 
            # So we're easily able to store 3 pairs of data, but flatten it into one long
            # list of 6 items...
            # If the name (row[2]) doesn't exist yet, then by trying to do this, defaultdict
            # will automatically create the default key as above.
            csvout.writerow(row + list(chain.from_iterable(names_previous[row[2]])))
            # Here, we store for the name any additional data that should be included for the name
            # on the next date group. In this instance we store the information seen for the last
            # occurrence of that name in this date. eg: If we've seen it more than once, then
            # we only include data from the last occurrence. 
            # NB: If you wanted to include more than one item of data for the name, then you could
            # utilise a deque again by building it within this date group
            to_add[row[2]] = row[3:5]            
        for key, val in to_add.iteritems():
            # We've finished the date, so before processing the next one, update the previous data
            # for the names. In this case, we push a single item of data to the front of the deck.
            # If, we were storing multiple items in the data loop, then we could .extendleft() instead
            # to insert > 1 set of data from above.
            names_previous[key].appendleft(val)

这只会在运行期间将名称和最后 3 个值保留在内存中。

可能希望调整以包含正确/写入新的标题,而不是仅仅跳过那些输入。

于 2013-11-14T19:09:20.657 回答
3

这是一个代码示例,应该演示您在随问题提供的示例数据中查找的内容。我将输入文件命名为“input.csv”并从工作目录读取/写入,“output.csv”进入同一个文件夹。我在代码中使用注释来尝试解释,将以前的记录存储在字典中并按名称查找并存储每个分数的列表 - 将当前日期记录存储在新的缓冲区字典中,并在每次日期时将其添加到主字典输入的变化。如果您有任何问题,请告诉我,代码有点粗糙 - 只是一个简单的示例。[:6] 切片给出当前名称的最近 6 个列表项(三个先前的分数/参数对)。

import csv

myInput = open('input.csv','rb')
myOutput = open('output.csv','wb')
myFields = ['Datatitle','Date','Name','Score','Parameter','Text',
            'LTscore','LTParameter','LTscore+1','LTParameter+1',
            'LTscore+2','LTParameter+2']
inCsv = csv.DictReader(myInput,myFields)
outCsv = csv.writer(myOutput)
outCsv.writerow(myFields) # Write header row

previous_dict = dict() # store scores from previous dates
new_dict = dict() # buffer for records on current-date only

def add_new():
    # merge new_dict into previous_dict
    global new_dict, previous_dict
    for k in new_dict:
        if not previous_dict.has_key(k):
            previous_dict[k] = list()
        # put new items first
        previous_dict[k] = new_dict[k] + previous_dict[k]
    new_dict = dict() # reset buffer

old_date = '00/00/00' # start with bogus *oldest* date string
inCsv.next() # skip header row
for row in inCsv:
    myTitle = row['Datatitle']
    myDate = row['Date']
    myName = row['Name']
    myScore = row['Score']
    myParameter = row['Parameter']
    myText = row['Text']
    if old_date != myDate:
        add_new() # store new_dict buffer with previous data
        old_date = myDate
    if not new_dict.has_key(myName):
        new_dict[myName] = []
    # put new scores first
    new_dict[myName] = [myScore,myParameter] + new_dict[myName]
    if not previous_dict.has_key(myName):
        previous_dict[myName] = []
    outCsv.writerow([myTitle,myDate,myName,myScore,myParameter,myText] \
                     + previous_dict[myName][:6])
# end loop for each row

myInput.close()
myOutput.close()

我的解决方案应该适用于大型数据集。如果内存消耗是一个问题,每个名称列表的长度可以限制为 3 个分数 - 目前我保留所有以前的分数,只显示 3 个以防您将来需要更多分数。如果数据的大小很笨重,您总是可以使用 sqlite 文件数据库而不是 dict 来临时查找磁盘上的数据,而不是全部在内存中。使用 8G 的 RAM 和 2G 的数据,您应该可以使用这里使用的内存中 python 字典。确保您在 64 位操作系统上使用 64 位版本的 Python。我的示例不会向屏幕输出任何内容,但对于大文件,您可能需要放置一个打印语句,每 N 行显示一次进度(如每 100、1000 行,根据您的系统速度选择)。请注意,屏幕输出会减慢处理文件数据的速度。

于 2013-11-10T07:24:58.380 回答
3

我的两分钱:
- Python 2.7.5
- 我使用 defaultdict 来保存每个Name的前几行。
- 我使用有限长度的双端队列来保存之前的行,因为我喜欢完整双端队列的先进先出行为。这让我很容易思考它——只要继续往里面塞东西。
- 我使用 operator.itemgetter() 进行索引和切片,因为它读起来更好。

from collections import deque, defaultdict
import csv
from functools import partial
from operator import itemgetter

# use a 3 item deque to hold the 
# previous three rows for each name
deck3 = partial(deque, maxlen = 3)
data = defaultdict(deck3)


name = itemgetter(2)
date = itemgetter(1)
sixplus = itemgetter(slice(6,None))

fields = ['Datatitle', 'Date', 'Name', 'Score', 'Parameter',
          'LTscore', 'LTParameter', 'LTscore+1', 'LTParameter+1',
          'LTscore+2', 'LTParameter+3']
with open('data.txt') as infile, open('processed.txt', 'wb') as outfile:
    reader = csv.reader(infile)
    writer = csv.writer(outfile)
    writer.writerow(fields)
    # comment out the next line if the data file does not have a header row
    reader.next()
    for row in reader:
        default = deque(['x', 'y', 'x', 'y', 'x', 'y'], maxlen = 6)
        try:
            previous_row = data[name(row)][-1]
            previous_date = date(previous_row)
        except IndexError:
            previous_date = None
        if  previous_date == date(row):
            # use the xtra stuff from last time
            row.extend(sixplus(previous_row))
            # discard the previous row because
            # there is a new row with the same date
            data[name(row)].pop()
        else:
            # add columns 3 and 4 from each previous row
            for deck in data[name(row)]:
                # adding new items to a full deque causes
                # items to drop off the other end
                default.appendleft(deck[4])
                default.appendleft(deck[3])
            row.extend(default)
        writer.writerow(row)
        data[name(row)].append(row)

在考虑了这个解决方案之后,我意识到它太复杂了——当我试图变得花哨时往往会发生这种情况。不太确定该协议,所以我将保留它 - 它确实具有为每个名称维护前 3 行的可能优势。

这是使用切片和常规字典的解决方案。它只保留先前处理的行。简单得多。为了便于阅读,我保留了 itemgetters。

import csv
from operator import itemgetter

fields = ['Datatitle', 'Date', 'Name', 'Score', 'Parameter',
          'LTscore', 'LTParameter', 'LTscore+1', 'LTParameter+1',
          'LTscore+2', 'LTParameter+3']

name = itemgetter(2)
date = itemgetter(1)
cols_sixplus = itemgetter(slice(6,None))
cols34 = itemgetter(slice(3, 5))
cols6_9 = itemgetter(slice(6, 10))
data_alt = {}

with open('data.txt') as infile, open('processed_alt.txt', 'wb') as outfile:
    reader = csv.reader(infile)
    writer = csv.writer(outfile)
    writer.writerow(fields)
    # comment out the next line if the data file does not have a header row
    reader.next()
    for row in reader:
        try:
            previous_row = data_alt[name(row)]
        except KeyError:
            # first time this name encountered
            row.extend(['x', 'y', 'x', 'y', 'x', 'y'])
            data_alt[name(row)] = row
            writer.writerow(row)
            continue
        if  date(previous_row) == date(row):
            # use the xtra stuff from last time
            row.extend(cols_sixplus(previous_row))
        else:
            row.extend(cols34(previous_row))
            row.extend(cols6_9(previous_row))
        data_alt[name(row)] = row
        writer.writerow(row)

我发现,对于类似类型的处理,累积行并将它们写入块而不是单独写入,可以大大提高性能。此外,如果可能,一次读取整个数据文件会有所帮助。

于 2013-11-11T00:35:39.897 回答
0

这是一种方法 - 确切的实现取决于您的数据,但这应该给您一个很好的起点。

您运行两次输入 CSV 数据。

  1. 在第一次通过输入时,扫描行并创建一个字典。名称可以用作键,例如 {'Tom' : [(date1, values),(date2, values)], 'George' : [(date1, values), (date2,values)]}。事实证明,使用嵌套字典更容易,例如{'Tom' : {date1: values, date2: values}, 'George' : {date1: values, date2: values}}. 更多关于下面的数据结构。

  2. 在第二次通过输入时,您连接原始输入数据和字典中的历史数据以创建输出数据。

选择历史数据的方式取决于输入数据的规律性。例如,如果日期按升序排序,并且您已经实现了一个列表字典,那么它可能就像从相关列表中获取切片一样简单,例如dataDict['Tom'][i-3:i]. 但是由于您提到同一日期可能有多个记录,您可能需要做额外的工作。一些可能性是:

  • 给定列表方法的字典,将值维护为列表,以便没有重复的日期条目,例如{'Tom' :(date1, [val1, val2, val3]),(date2, values)], 'George' : [(date1, values),(date2,values)]}.

  • 给定字典方法,查找您需要的特定日期范围。在这种情况下,您可能需要检查 KeyError 异常,除非每个日期都连续可用。您还可以维护可用日期的附加排序索引。

于 2013-11-08T14:55:32.830 回答
0

我一直在选择这个,因为我有一个类似的小型项目。我将发布第二个答案,并使用协程进行改进。该过程与我的其他答案相似,但速度更快(尽管我不知道为什么)。共有三个协程——读取器、处理器和写入器。下面的代码显示了一些简短的分析器统计信息。

"""uses coroutines.

2 gig file, 1M lines, 2K characters/line:
- read and send one line at a time
- process and send one line
- accumulate 720 lines before write
Wed Nov 13 08:04:34 2013    fooprof
    10947682 function calls (9946973 primitive calls) in 82.147 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000   82.147   82.147 <string>:1(<module>)
        1   59.896   59.896   82.147   82.147 optimizations.py:45(reader)
  1000710    8.864    0.000   21.703    0.000 optimizations.py:57(processor)
  1000710    1.506    0.000    6.137    0.000 optimizations.py:94(writer)
  1002098    0.185    0.000    0.185    0.000 {len}
  1000708    0.209    0.000    0.209    0.000 {method 'append' of 'list' objects}
      2/1    0.073    0.036    0.078    0.078 {method 'close' of 'generator' objects}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
  1937129    0.295    0.000    0.295    0.000 {method 'extend' of 'list' objects}
  1002097    3.115    0.000    3.115    0.000 {method 'join' of 'str' objects}
2001416/1000708    0.839    0.000   22.172    0.000 {method 'send' of 'generator' objects}
  1000708    4.305    0.000    4.305    0.000 {method 'split' of 'str' objects}
  1000708    0.823    0.000    0.823    0.000 {method 'strip' of 'str' objects}
     1390    2.033    0.001    2.033    0.001 {method 'write' of 'file' objects}
        1    0.004    0.004    0.004    0.004 {method 'writelines' of 'file' objects}
        2    0.001    0.001    0.001    0.001 {open}

Running a few in a row helps:
Fri Nov 15 22:12:02 2013    fooprof
    10947671 function calls (9946963 primitive calls) in 69.237 seconds
Fri Nov 15 22:13:44 2013    fooprof
    10947671 function calls (9946963 primitive calls) in 64.330 seconds

using a dummy reader that sends the same line 1M times
Wed Nov 13 13:36:57 2013    fooprof
    10004374 function calls (9004373 primitive calls) in 23.013 seconds

using dummy reader AND writer --> processor time
Wed Nov 13 13:45:08 2013    fooprof
    10001730 function calls (9001729 primitive calls) in 10.523 seconds

using a dummy processor and writer --> mostly reader time
Wed Nov 13 22:45:24 2013    fooprof
        6005839 function calls (5005131 primitive calls) in 24.502 seconds

using a dummy reader and processor --> writer time
Wed Nov 13 22:52:12 2013    fooprof
    6004374 function calls (5004373 primitive calls) in 24.326 seconds

"""

import csv
from operator import itemgetter

# data,01/09/13,george,219,dataa,text
# data,01/09/13,george,219,dataa,text,x,y,x,y,x,y
# just keep the previous row

fields = ['Datatitle', 'Date', 'Name', 'Score', 'Parameter',
          'LTscore', 'LTParameter', 'LTscore+1', 'LTParameter+1',
          'LTscore+2', 'LTParameter+3']

def reader(processor, filename = 'data.txt'):
    processor.next()
    with open(filename) as f:
        #skip the header
        f.next()
        for line in f:
            processor.send(line)
    processor.close()
    return 'done'

def processor(writer):
    """Process line and send to writer.

    line --> str, a complete row of data
    sends str
    """
    date = itemgetter(1)
    name = itemgetter(2)
    cols_sixplus = itemgetter(slice(6,None))
    cols34 = itemgetter(slice(3, 5))
    cols6_9 = itemgetter(slice(6, 10))
    data = {}
    writer.next()
    try:
        while True:
            line = yield
            row = line.strip().split(',')
            try:
                previous_row = data[name(row)]
            except KeyError as e:
                # first time this name encountered
                row.extend(['x', 'y', 'x', 'y', 'x', 'y'])
                data[name(row)] = row
                writer.send(','.join(row) + '\n' )
                continue
            if  date(previous_row) == date(row):
                # use the xtra stuff from last time
                row.extend(cols_sixplus(previous_row))
            else:
                row.extend(cols34(previous_row))
                row.extend(cols6_9(previous_row))
            data[name(row)] = row
            writer.send(','.join(row) + '\n')
    except GeneratorExit:
        writer.close()

def writer(filename = 'processed.txt', accum = 1000):
    with open(filename, 'wb') as f:
        f.write('Datatitle,Date,Name,Score,Parameter,LTscore,LTParameter,LTscore+1,LTParameter+1,LTscore+2,LTParameter+3\n')
        try:
            while True:
                # dataout = list()
                dataout = list()
                while len(dataout) < accum:
                    dataout.append((yield))
                f.write(''.join(dataout))
        except GeneratorExit:
            f.writelines(dataout)


if __name__ == '__main__':
    import cProfile, pstats

    cProfile.run("reader(processor(writer(accum = 720)), filename = 'biggerdata.txt')", 'fooprof')
    p = pstats.Stats('fooprof')
    p.strip_dirs().sort_stats(-1).print_stats()

如果您使用虚拟函数(模拟?)查看分析器时间,它们不会加起来所有三个实际函数的时间 - 我也不明白这一点。

我尝试在阅读器中使用 linecache,但速度较慢。我在阅读器中尝试了 mmap,读取 200M 块,但速度较慢 - 可能是因为我使用 re.finditer() 来挑选线条。我可能会出于自己的目的重新访问 mmap 阅读器。

于 2013-11-16T06:55:19.427 回答