1

我正在尝试对我们的设备生成的大量 .CSV 文件进行分类,但有分类部分的库存。每个文件由 30 多列组成,并且可以有无限数量的行。我需要实现的是如何检查同时发生在多列中多行的事件。例如,我需要检查是否有任何结果:

  • 在“Test_Res_1”列中,连续 15 个睾丸的值小于 12
  • 在“Test_Res_2”列中,连续 10 个睾丸的值小于 5
  • 在“Test_Div”列中,连续 20 个睾丸的值小于 15
  • 在“Test_time”列中,连续 10 个睾丸的值小于 60
  • ......................一些连续测试的其他条件......

然后,如果满足任何或几个条件,我只会将该文件的名称写入 .txt 文件。我实现了该论坛用户建议的代码,并且脚本运行良好。但我只是复制一个块,每次我想检查另一个条件时执行检查。我确信有更好的方法来实现该代码并减少我目前拥有的巨大的票据。

以下是该文件的示例: 在此处输入图像描述

我尝试了在该论坛上找到的几个建议,但没有一个有效。他们中的一些人在一个条件下工作,但我需要检查我提到的几个条件。如果满足条件,我知道如何打开文件并将它们保存到 .txt,但我只是不知道如何检查多列和多行中的多个条件。检查一行很容易,但是检查其中的几个给我带来了很大的麻烦。

import os, os.path, zipfile, csv, datetime
import smtplib, os
f = open("test.txt", "w")
flagtotal=[]
path="datafiles/"  # insert the path to the directory of interest
dirList=os.listdir(path)
for filename in dirList:
    if filename.endswith((".csv")):       
        file=os.path.splitext(filename)
        reader = csv.reader(open(filename))
        # I GOT STOCK HERE!!!! Although the code seems to work just fine. I create a completely  new instance for reader every time I want to add new condition. reader.next() # skip header row
    GROUP_SIZE = 5
    THRESHOLD = 0.5
    cond_deque = deque(maxlen=GROUP_SIZE) # *maxlen* requires Python version 2.6+        
    linenum = 0
    while len(cond_deque) < GROUP_SIZE-1:
        try:
            row = reader.next()
            linenum += 1
            col0, col1, col4, col5, col6, col23, col24, col25 = (
                float(row[i]) for i in (0, 1, 4, 5, 6, 23, 24, 25))
            cond_deque.append(col1 < THRESHOLD)
        except StopIteration:
            print 'less that {} rows of data in file'.format(GROUP_SIZE)
            break
    # then process any remaining lines
    for row in reader:
        col0, col1, col4, col5, col6, col23, col24, col25 = (
            float(row[i]) for i in (0, 1, 4, 5, 6, 23, 24, 25))
        linenum += 1
        cond_deque.append(col1 < THRESHOLD)
        if cond_deque.count(True) == GROUP_SIZE:
            str1 = 'Condition 1 in cycles {}-{} had {} consecutive cycles  < {}'.format(
                linenum-GROUP_SIZE+1, linenum, GROUP_SIZE, THRESHOLD)
            #print str1
            flag.append(str1)
            break  # stop looking

    #checking for the second condition
    reader = csv.reader(open('processed_data/'+filename))
    reader.next()        
    GROUP_SIZE = 2
    THRESHOLD = 20
    cond_deque = deque(maxlen=GROUP_SIZE) # *maxlen* requires Python version 2.6+        
    linenum = 0
    while len(cond_deque) < GROUP_SIZE-1:
        try:
            row = reader.next()
            linenum += 1
            col0, col1, col4, col5, col6, col23, col24, col25 = (
                float(row[i]) for i in (0, 1, 4, 5, 6, 23, 24, 25))
            cond_deque.append(col1 < THRESHOLD)
        except StopIteration:
            #print 'less that {} rows of data in file'.format(GROUP_SIZE)
            break
    # then process any remaining lines
    for row in reader:
        col0, col1, col4, col5, col6, col23, col24, col25 = (
            float(row[i]) for i in (0, 1, 4, 5, 6, 23, 24, 25))
        linenum += 1
        cond_deque.append(col5 < THRESHOLD/60)
        if cond_deque.count(True) == GROUP_SIZE:
            str1 = 'Condition 2 {}-{} had {} consecutive cycles  < {} minutes'.format(
                linenum-GROUP_SIZE+1, linenum, GROUP_SIZE, THRESHOLD)
            #print str1
            flag.append(str1)
            break  # stop looking
today = datetime.date.today()
datestring='Date of testing: '+today.strftime('%m/%d/%Y')
if len(flagtotal)>0:
    flagtotal.insert(0,datestring)
    flagtotal.insert(1,'The following files met the criteria.\n--------------------------------------------')
    f.write("\n".join(map(lambda x: str(x), flagtotal)))
f.close()
4

2 回答 2

1

我认为以下内容显示了如何做你想做的事。它基于使用collections.deque类。它基本上是我对您的另一个问题的回答中的逻辑的通用版本。这是通过将标准中的所有数据和相关处理封装到以应用程序为中心的类中来完成的。

结果需要相当多的代码,但比您尝试的方法更紧凑并且可能更快。它只读取每个文件一次,并检查在单次通过期间是否满足可变数量的条件中的任何一个。

import csv
from collections import deque
import datetime
from glob import iglob
import os

class Criterion(object):
    """ represents one of the criteria to be checked.
        in_a_row is the number of consecutive rows in which the expression,
        given as a string, evaluated to True """
    def __init__(self, in_a_row, expression):
        self.in_a_row = in_a_row
        self.expression = expression
        self.bytecode = compile(expression, '<string>', 'eval')
        self.deque = deque(maxlen=in_a_row)
    def eval_and_check(self, local_vars):
        """ evaluate expression in context of local variables, append result
            to deque, and return whether 'in_a_row' criterion was satisfied """
        self.deque.append(eval(self.bytecode, globals(), local_vars))
        return self.deque.count(True) == self.in_a_row
    def reset(self):
        self.deque.clear()
    def format_match(self, filename, linenum):
        return 'lines {}-{} in {} had {} consecutive rows with "{}"'.format(
            linenum-self.in_a_row+1, linenum, filename, self.in_a_row,
            self.expression)

criteria = [Criterion(5, 'Test_Res_2 < 40'),
            Criterion(3, '13 <= Test_Res_4 <= 15'), ]
flagtotal = []
datapath = "datafiles"  # directory path to location of csv files

for filename in iglob(os.path.join(datapath, '*.csv')):
    with open(filename) as csvfile:
        reader = csv.reader(csvfile, skipinitialspace=True)
        reader.next() # skip over initial fieldnames row
        for criterion in criteria:  # initialize all before processing file
            criterion.reset()
        condition_satisfied = False
        for linenum, row in enumerate(reader, start=1):
            # define local vars for use in criterion expression evaluation
            (Test_num, Test_Res_1, Test_Res_2, Test_Res_3, Test_Res_4, 
             Test_div, Test_time) = [int(row[0])] + map(float, row[1:])
            for criterion in criteria:
                if criterion.eval_and_check(locals()):
                    #print criterion.format_match(filename, linenum)
                    flagtotal.append(os.path.basename(filename))
                    condition_satisfied = True
                    break  # quit criterion checking for this row
            if condition_satisfied:
                break  # quit processing rows of this csv file

with open('test.txt', 'w') as f:
    f.write('Date of testing: {}\n'.format(
            datetime.date.today().strftime('%m/%d/%Y')) +
            'The following files met the criteria:\n'
            '-------------------------------------\n')
    if flagtotal:
        print('\n'.join(flagtotal))
        f.write('\n'.join(flagtotal) + '\n')
    else:
        print('no files met the criteria')
        f.write('no files met the criteria\n')
于 2013-07-16T10:05:34.623 回答
0

我不知道该csv模块,但我们假设您可以获得列字典。然后您可以这样做以在一列中查找连续条目:

import itertools

# in column "Test_Res_1" had values less than 12 for for 15 consecutive tests
col = reader["Test_Res_1"] # get the column as a list

# Find a count of the consecutive values < 12
consec_lt_12 = [len(list(cnt)) for val,cnt in itertools.groupby(col, lambda x: x < 12) if val]

# Check if the maximum is >= 15
if (max(consec_lt_12) >= 15):
   # ok! found it

如果你可以让它工作,那么只需对你想要的任何列和任何值重复它,然后按照你喜欢的方式将它们链接在一起(例如,你是否需要在 A列B 列,或 A 列B 列中查找值, ETC。)

于 2013-07-15T15:05:48.353 回答