1

这个问题是以下问题的后续:With statement and python threading

我一直在尝试使用 python 线程 api。我有这段代码适用于我想要实现的目标:---->在调用 python 线程上运行之前执行函数。

但是要做到这一点,我总是要在run()方法中调用time.sleep(1),使其继续执行execute()。否则线程退出而没有函数分配和执行。有没有更好的方法来实现这种类型等待?

from __future__ import print_function
import threading
import time
import functools
import contextlib
import thread
from threading import Lock
#import contextlib
#Thread module for dealing with lower level thread operations.Thread is limited use Threading instead.

def timeit(fn):
    '''Timeit function like this doesnot work with the thread calls'''
    def wrapper(*args,**kwargs):
        start = time.time()
        fn(*args,**kwargs)
        end = time.time()
        threadID = ""
        print ("Duration for func %s :%d\n"%(fn.__name__ +"_"+ threading.current_thread().name ,end-start))
    return wrapper

exitFlag = 0

@timeit
def print_time(counter,delay):
    while counter:
        if exitFlag:
            thread.exit()
        time.sleep(delay)
        print("%s : %s_%d"%(threading.current_thread().name,time.ctime(time.time()),counter))
        counter -= 1

class Mythread(threading.Thread):
    def __init__(self,threadID,name):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self._f = None

    def run(self):
        print("Starting%s\n" % self.name)
        time.sleep(1)
        if self._f:
            self._f()
            print("Exiting%s\n" % self.name)
        else:
            print("Exiting%s without function execution\n" % self.name ) 

#     def set_f(self,f):
#         self._f = f

    def execute(self,f,*args,**kwargs):
        self._f=functools.partial(f,*args,**kwargs)

    def __enter__(self):
        self.start()

    def __exit__(self,type,value,traceback):
        self.join()




class ThreadContainer(object):
    def __init__(self,id,name):
        self._t = Mythread(id,name)

    def execute(self,f,*args,**kwargs):
        self._f=functools.partial(f,*args,**kwargs)
        self._t.set_f(self._f)
#        self._t.start()
#         self._t.join()


    def __enter__(self):
        self._t.start()

    def __exit__(self,type,value,traceback):
        self._t.join()




if __name__ == '__main__':
    '''
    print_time(5, 1)
     threadLock = threading.Lock()
     threads = []
     thread1 = Mythread(1,"Thread1",5,1)
     thread2 = Mythread(2,"Thread2",5,2)
     thread1.start()
     thread2.start()
     threads.append(thread1)
     threads.append(thread2)
     for t in threads:
         t.join()
    '''
#     thread1 = Mythread(1,"Thread1")
#     thread2 = Mythread(2,"Thread2")
#     with contextlib.nested(ThreadContainer(1,"Thread1"),ThreadContainer(2,"Thread2")) as (t1,t2):
#         t1.execute(print_time,5,1)
#         t2.execute(print_time,5,2)
    t1 = Mythread(1,"Thread1")
    t2 = Mythread(2,"Thread2")
    with contextlib.nested(t1,t2):
        t1.execute(print_time,5,1)
        t2.execute(print_time,5,2)


    print("Exiting main thread ")
4

1 回答 1

1

这里的问题是你希望run函数等到execute函数被调用。

当然,显而易见的解决方案是在调用execute之前先调用start

t1.execute(print_time,5,1)
t2.execute(print_time,5,2)
with contextlib.nested(t1, t2):
    pass

…或者只是execute调用start,或者将函数传递给构造函数或start调用,或者…</p>

此外,您的预期设计有点奇怪。线程函数旨在处理_f尚未设置的情况……但您希望它等到_f设置完成?


但可以想象,这样的问题在更现实的设计中可能会出现,那么,让我们看看如何解决它。

首先,添加sleep以解决线程问题几乎总是表明您做错了什么。这也是导致可怕的性能问题的好方法(例如:当你sleep在足够多的地方添加足够多的 s 以使一切正常工作时,你的应用程序需要 30 秒而不是 30 毫秒才能启动)——而且,更糟糕的是,竞争条件错误(肯定 1 秒总是足够的时间,对吧?除非计算机正在颠簸交换,或者从休眠状态中醒来,或者忙于使用所有 CPU 的其他程序,或者......)。

如果您尝试跨线程同步操作,则需要使用同步对象。诀窍是知道正确的。阅读整个文档LockEvent和 3.x 添加Barrier),并找到一个关于线程的教程,以更广泛地了解所有这些东西的用途。*

在这种情况下,您的代码正在等待对已保存状态进行一些更改,而其他代码正在进行更改,这是'Condition'的典型用例。所以:

class Mythread(threading.Thread):
    def __init__(self, threadID, name, condition):
        self.condition = condition
        # ... same as before

    def run(self):
        # ... setup before checking for _f

        with self.condition:
            while not self._f:
                self.condition.wait()
        self._f()

        # ... anything else you want

现在,您需要创建Condition,将其传递给线程,然后将其传递给线程notify

您可以使用单个Condition

condition = threading.Condition()
t1 = Mythread(1, "Thread1", condition)
t2 = Mythread(2, "Thread2", condition)
with contextlib.nested(t1,t2):
    with condition:
        t1.execute(print_time, 5, 1)
        t2.execute(print_time, 5, 2)
        condition.notify_all()

或者,您可以为每个线程提供自己的Condition

class Mythread(threading.Thread):
    def __init__(self, threadID, name):
        self.condition = Condition()
        # ... same as before

# ...

t1 = Mythread(1, "Thread1")
t2 = Mythread(2, "Thread2")
with contextlib.nested(t1,t2):
    with t1.condition:
        t1.execute(print_time, 5, 1)
        t1.condition.notify()
    with t2.condition:
        t2.execute(print_time, 5, 1)
        t2.condition.notify()

请注意,这不允许您显式 "not set" _f,但这样做很容易。例如,您可以添加一个_f_set属性,并检查它而不是_f,因此有人可以调用execute(None)(然后notify)唤醒您并让您进入“否_f”情况。


* 警告:部分命名不一致。有一种不同的东西也称为“障碍”,另一种不同的东西也称为“栅栏”,“事件”的许多变体与 Python 完全不同(其中一些更像是条件,但不是实际上可以这样使用),有时“条件变量”是由同步对象而不是同步对象保护的实际共享状态,依此类推……</p>

于 2013-05-07T18:22:33.530 回答