2

我正在浏览一个数据集并尝试从数据库(postgres)中计算特征。

问题是程序时不时地卡在某个地方(从我启用的数据库日志确认,很长时间没有新的查询发生),当我按 ctrl+c 时,程序似乎恢复正常(我还没有确认计算是否正确,因为有很多行)。它不会卡在同一个位置,但它似乎有一个随机模式。知道我可能做错了什么吗?

我有 2 个文件 main.py 和 NAC.py。

主要.py:

import NAC
from dateutil.parser import parse
from datetime import timedelta
rows = fc.Read_CSV_to_Dict(input_file) #just a wrapper around csv.Dictreader
i=0
start_time = time.time()
for row in rows : #rows has about 600,000 rows
    ret1,ret2 = NAC.function(row['key1'], ...) #and other parameters
    #new keys
    row['newKey1'],row['newKey2'] = ret1
    row['newKey3'],row['newKey4'] = ret2 #unpacking
    i=i+1
    if(i%10000==0): #progress monitor
        print i
print (time.time()-start_time)/60
NAC.db_close()

NAC.py:

from dateutil.parser import parse
from datetime import timedelta
import psycopg2
import psycopg2.extras

def function(param1, ...):
    """     
    Returns:
        2 element list, each a list by itself
    """ 
    nsclist = [0]*param2_count
    naclist = [0]*param2_count  
    for i in range(param2_count):
        stime = (begintime + timedelta(seconds = 60*intervalPeriod * i))
        etime = (begintime + timedelta(seconds = 60*intervalPeriod * (i+1)))
        table1_query = "select sum(count)from table1 where column1= '{0}' and column2>'{1}'::TIMESTAMP WITH TIME ZONE and column2<='{2}'::TIMESTAMP WITH TIME ZONE"
        cur.execute(sched_query.format(param1,stime,etime))
        nsclist[i] = cur.fetchone()[0]
        if(nsclist[i] == []):
            nsclist[i] = 0
        table2_query = "select sum(count)from table2 where column1 = '{0}' and column2 >'{1}'::TIMESTAMP WITH TIME ZONE and column2 <='{2}'::TIMESTAMP WITH TIME ZONE"
        cur.execute(table2_query .format(param1,stime,etime))
        naclist[i] = cur.fetchone()[0]
        if(naclist[i] == []):
            naclist[i] = 0
    return nsclist, naclist

def db_close():
    cur.close()
    conn.close()

intervalPeriod = 5 #minutes
conn = psycopg2.connect(cs.local_connstr)
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)

数据库日志时间戳:

2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:29:30 ctl+c pressed (manually added... not in the log)
2013-07-01 18:29:30 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:29:30 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:29:30 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:29:30 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:29:30 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:29:30 PDT LOG:  statement: select sum(count)from ...
4

1 回答 1

0

原来是光标有问题。我只需要在每个函数调用时打开和关闭光标。我不确定为什么。

from dateutil.parser import parse
from datetime import timedelta
import psycopg2
import psycopg2.extras

def function(param1, ...):
    """     
    Returns:
        2 element list, each a list by itself
    """ 
    cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
    nsclist = [0]*param2_count
    naclist = [0]*param2_count  
    for i in range(param2_count):
        table1_query = "select sum(count)from table1 where column1= '{0}' and column2>'{1}'::TIMESTAMP WITH TIME ZONE and column2<='{2}'::TIMESTAMP WITH TIME ZONE"
        cur.execute(sched_query.format(param1,stime,etime))
        nsclist[i] = cur.fetchone()[0]
        if(nsclist[i] == []):
            nsclist[i] = 0
        table2_query = "select sum(count)from table2 where column1 = '{0}' and column2 >'{1}'::TIMESTAMP WITH TIME ZONE and column2 <='{2}'::TIMESTAMP WITH TIME ZONE"
        cur.execute(table2_query .format(param1,stime,etime))
        naclist[i] = cur.fetchone()[0]
        if(naclist[i] == []):
            naclist[i] = 0
    cur.close()
    return nsclist, naclist

def db_close():
    conn.close()

intervalPeriod = 5 #minutes
conn = psycopg2.connect(cs.local_connstr)
于 2013-07-02T10:25:49.907 回答