3

我有 python TCP 客户端,需要循环发送媒体(.mpg)文件到“C”TCP 服务器。

我有以下代码,在单独的线程中,我正在读取 10K 文件块并将其发送并在循环中重新执行,我认为这是因为我实现了线程模块或 tcp 发送。我正在使用队列在我的 GUI ( Tkinter ) 上打印日志,但经过一段时间后它会耗尽内存。.

更新 1 - 根据要求添加了更多代码

线程类“Sendmpgthread”用于创建线程发送数据

.
. 
def __init__ ( self, otherparams,MainGUI):
    .
    .
    self.MainGUI = MainGUI
    self.lock = threading.Lock()
    Thread.__init__(self)

#This is the one causing leak, this is called inside loop
def pushlog(self,msg):
    self.MainGUI.queuelog.put(msg)

def send(self, mysocket, block):
    size = len(block)
    pos = 0;
    while size > 0:
        try:
            curpos = mysocket.send(block[pos:])
        except socket.timeout, msg:
            if self.over:
                 self.pushlog(Exit Send)
                return False
        except socket.error, msg:
            print 'Exception'     
            return False  
        pos = pos + curpos
        size = size - curpos
    return True

def run(self):
    media_file = None
    mysocket = None 

    try:
        mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        mysocket.connect((self.ip, string.atoi(self.port)))
        media_file = open(self.file, 'rb') 

        while not self.over:
            chunk = media_file.read(10000)
            if not chunk:   # EOF Reset it
                print 'resetting stream'
                media_file.seek(0, 0)
                continue
            if not self.send(mysocket, chunk): # If some error or thread is killed 
                break;

            #disabling this solves the issue
            self.pushlog('print how much data sent')       

    except socket.error, msg:
        print 'print exception'
    except Exception, msg:
        print 'print exception'

    try:
        if media_file is not None:
            media_file.close()
            media_file = None            
        if mysocket is not None:
            mysocket.close()
            mysocket = None
    finally:
            print 'some cleaning'   

def kill(self):
    self.over = True

我发现这是因为 Queue 的错误实现,因为评论该部分解决了问题

更新 2 - 从上面的 Thread 类调用的 MainGUI 类

class MainGUI(Frame):
    def __init__(self, other args):
       #some code
       .
       .
        #from the above thread class used to send data
        self.send_mpg_status = Sendmpgthread(params)
        self.send_mpg_status.start()     
        self.after(100, self.updatelog)
        self.queuelog = Queue.Queue()

    def updatelog(self):
       try:
           msg = self.queuelog.get_nowait() 

           while msg is not None:
               self.printlog(msg)
               msg = self.queuelog.get_nowait() 
        except Queue.Empty:
           pass

        if self.send_mpg_status: # only continue when sending   
            self.after(100, self.updatelog)

    def printlog(self,msg):
        #print in GUI
4

4 回答 4

5

由于 printlog 正在添加到 tkinter 文本控件,因此该控件占用的内存将随着每条消息而增长(它必须存储所有日志消息才能显示它们)。

除非存储所有日志至关重要,否则一个常见的解决方案是限制显示的最大日志行数。

一个简单的实现是在控件达到最大消息数后从一开始就消除多余的行。添加一个函数来获取控件中的行数,然后在 printlog 中类似于:

while getnumlines(self.edit) > self.maxloglines:
    self.edit.delete('1.0', '1.end')

(以上代码未经测试)

更新:一些一般准则

请记住,看起来像内存泄漏的情况并不总是意味着函数是wrong,或者内存不再可访问。很多时候,正在积累元素的容器缺少清理代码。

此类问题的基本通用方法:

  • 就代码的哪一部分可能导致问题形成意见
  • 通过评论该代码来检查它(或继续评论代码直到找到候选人)
  • 在负责的代码中查找容器,添加代码以打印它们的大小
  • 决定哪些元素可以安全地从容器中移除,以及何时移除
  • 测试结果
于 2013-10-18T15:30:43.163 回答
4

我看不出您的代码片段有任何明显错误。

为了减少 Python 2.7 下的内存使用量,我会buffer(block, pos)使用block[pos:]. 我也会用你的方法mysocket.sendall(block)代替。send

如果上面的想法不能解决你的问题,那么这个错误很可能在你的代码中的其他地方。您能否发布完整 Python 脚本的最短版本,它仍然会出现内存不足(http://sscce.org/)?这会增加你获得有用帮助的变化。

于 2013-10-15T08:14:39.777 回答
2

内存不足错误表明数据正在生成但未使用或释放​​。浏览您的代码,我猜想这两个领域:

  • 消息被推送到方法Queue.Queue()中的实例上pushlog。他们被消费了吗?
  • MainGui printlog方法可能在某处写文本。例如。它是否在没有任何消息修剪的情况下不断写入某种 GUI 小部件?

从您发布的代码中,我将尝试以下操作:

  1. 在. print_ updatelog如果由于某种原因(例如调用失败)而没有继续after()调用它,那么queuelog它将继续无限制地增长。
  2. 如果updatelog不断被调用,则将注意力转向printlog。注释这个函数的内容,看看是否仍然出现内存不足的错误。如果他们不这样做,那么其中的某些东西printlog可能会保留记录的数据,您需要深入挖掘以找出是什么。

除此之外,代码可以清理一下。 self.queuelog直到线程启动之后才会创建,这会产生竞争条件,线程可能会在创建队列之前尝试写入队列。应该在线程启动之前将创建queuelog移到某个地方。

updatelog也可以重构以消除冗余:

def updatelog(self):
       try:
           while True:
               msg = self.queuelog.get_nowait() 
               self.printlog(msg)
        except Queue.Empty:
           pass

我假设该kill函数是从 GUI 线程调用的。为避免线程竞争条件,self.over应该是线程安全变量,例如threading.Event对象。

def __init__(...):
    self.over = threading.Event()

def kill(self):
    self.over.set()
于 2013-10-17T11:32:35.177 回答
1

TCP 发送循环中没有数据堆积。

内存错误可能是由日志队列引起的,因为您尚未发布完整的代码尝试使用以下类进行日志记录:

from threading import Thread, Event, Lock
from time import sleep, time as now


class LogRecord(object):
    __slots__ = ["txt", "params"]
    def __init__(self, txt, params):
        self.txt, self.params = txt, params

class AsyncLog(Thread):
    DEBUGGING_EMULATE_SLOW_IO = True

    def __init__(self, queue_max_size=15, queue_min_size=5):
        Thread.__init__(self)
        self.queue_max_size, self.queue_min_size = queue_max_size, queue_min_size
        self._queuelock = Lock()
        self._queue = []            # protected by _queuelock
        self._discarded_count = 0   # protected by _queuelock
        self._pushed_event = Event()
        self.setDaemon(True)
        self.start()

    def log(self, message, **params):
        with self._queuelock:
            self._queue.append(LogRecord(message, params))
            if len(self._queue) > self.queue_max_size:
                # empty the queue:
                self._discarded_count += len(self._queue) - self.queue_min_size
                del self._queue[self.queue_min_size:] # empty the queue instead of creating new list (= [])
            self._pushed_event.set()

    def run(self):
        while 1: # no reason for exit condition here
            logs, discarded_count = None, 0
            with self._queuelock:
                if len(self._queue) > 0:
                    # select buffered messages for printing, releasing lock ASAP
                    logs = self._queue[:]
                    del self._queue[:]
                    self._pushed_event.clear()
                    discarded_count = self._discarded_count
                    self._discarded_count = 0
            if not logs:
                self._pushed_event.wait()
                self._pushed_event.clear()
                continue
            else:
                # print logs
                if discarded_count:
                    print ".. {0} log records missing ..".format(discarded_count)
                for log_record in logs:
                    self.write_line(log_record)
                if self.DEBUGGING_EMULATE_SLOW_IO:
                    sleep(0.5)

    def write_line(self, log_record):
        print log_record.txt, " ".join(["{0}={1}".format(name, value) for name, value in log_record.params.items()])



if __name__ == "__main__":
    class MainGUI:
        def __init__(self):
            self._async_log = AsyncLog()
            self.log = self._async_log.log # stored as bound method

        def do_this_test(self):
            print "I am about to log 100 times per sec, while text output frequency is 2Hz (twice per second)"

            def log_100_records_in_one_second(itteration_index):
                for i in xrange(100):
                    self.log("something happened", timestamp=now(), session=3.1415, itteration=itteration_index)
                    sleep(0.01)

            for iter_index in range(3):
                log_100_records_in_one_second(iter_index)

    test = MainGUI()
    test.do_this_test()

我注意到您在发送循环中的任何地方都没有 sleep(),这意味着数据被尽可能快地读取并尽可能快地发送。请注意,在播放媒体文件时这是不可取的行为 - 容器时间戳用于指示数据速率。

于 2013-10-17T12:17:05.613 回答