4

我是并发编程的新手。

我想重复执行三个任务。前两个应该一直运行,第三个应该每隔一小时左右运行一次。前两个任务可以并行运行,但我总是想在第三个任务运行时暂停它们。

这是我尝试过的框架:

import threading
import time

flock = threading.Lock()
glock = threading.Lock()

def f():
    while True:
        with flock:
            print 'f'
            time.sleep(1)

def g():
    while True:
        with glock:
            print 'g'
            time.sleep(1)

def h():
    while True:
        with flock:
            with glock:
                print 'h'
        time.sleep(5)

threading.Thread(target=f).start()
threading.Thread(target=g).start()
threading.Thread(target=h).start()

我希望这段代码每秒打印一个 f 和 ag,大约每五秒打印一个 h。但是,当我运行它时,大约需要 12 个 f 和 12 g 才能开始看到一些 h。看起来前两个线程不断释放并重新获取它们的锁,而第三个线程被排除在循环之外。

  1. 这是为什么?当第三个线程尝试获取当前持有的锁,然后将其释放时,不应该立即成功获取而不是第一个/第二个线程立即再次获取它吗?我可能误解了一些东西。
  2. 什么是实现我想要的好方法?

注意:将time.sleep(1)调用移出 withflock/glock 块适用于这个简单的示例,但显然不适用于线程大部分时间都在执行实际操作的实际应用程序。当前两个线程在每次执行循环体后休眠一秒钟时,随着锁的释放,第三个任务仍然永远不会被执行。

4

5 回答 5

5

如何使用threading.Events来做:

import threading
import time
import logging

logger=logging.getLogger(__name__)

def f(resume,is_waiting,name):
    while True:
        if not resume.is_set():
            is_waiting.set()
            logger.debug('{n} pausing...'.format(n=name))
            resume.wait()
            is_waiting.clear()
        logger.info(name)
        time.sleep(1)

def h(resume,waiters):
    while True:
        logger.debug('halt') 
        resume.clear()
        for i,w in enumerate(waiters):
            logger.debug('{i}: wait for worker to pause'.format(i=i))
            w.wait()
        logger.info('h begin')
        time.sleep(2)
        logger.info('h end')        
        logger.debug('resume')
        resume.set()
        time.sleep(5)

logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

# set means resume; clear means halt
resume = threading.Event()
resume.set()

waiters=[]
for name in 'fg':
    is_waiting=threading.Event()
    waiters.append(is_waiting)
    threading.Thread(target=f,args=(resume,is_waiting,name)).start()    
threading.Thread(target=h,args=(resume,waiters)).start()

产量

[07:28:55 Thread-1] f
[07:28:55 Thread-2] g
[07:28:55 Thread-3] halt
[07:28:55 Thread-3] 0: wait for worker to pause
[07:28:56 Thread-1] f pausing...
[07:28:56 Thread-2] g pausing...
[07:28:56 Thread-3] 1: wait for worker to pause
[07:28:56 Thread-3] h begin
[07:28:58 Thread-3] h end
[07:28:58 Thread-3] resume
[07:28:58 Thread-1] f
[07:28:58 Thread-2] g
[07:28:59 Thread-1] f
[07:28:59 Thread-2] g
[07:29:00 Thread-1] f
[07:29:00 Thread-2] g
[07:29:01 Thread-1] f
[07:29:01 Thread-2] g
[07:29:02 Thread-1] f
[07:29:02 Thread-2] g
[07:29:03 Thread-3] halt

(针对评论中的问题)此代码尝试测量h-thread 从其他工作线程获取每个锁所需的时间。

这似乎表明,即使h正在等待获取锁,其他工作线程也可能以相当高的概率释放并重新获取锁。没有优先考虑h仅仅因为它已经等待了更长的时间。

David Beazley 在 PyCon 上介绍了与线程和 GIL 相关的问题。这是幻灯片的 pdf 文件。这是一本引人入胜的读物,也可能有助于解释这一点。

import threading
import time
import logging

logger=logging.getLogger(__name__)

def f(lock,n):
    while True:
        with lock:
            logger.info(n)
            time.sleep(1)

def h(locks):
    while True:
        t=time.time()
        for n,lock in enumerate(locks):
            lock.acquire()
            t2=time.time()
            logger.info('h acquired {n}: {d}'.format(n=n,d=t2-t))
            t=t2
        t2=time.time()
        logger.info('h {d}'.format(d=t2-t))
        t=t2
        for lock in locks:
            lock.release()
        time.sleep(5)

logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

locks=[]
N=5
for n in range(N):
    lock=threading.Lock()
    locks.append(lock)
    t=threading.Thread(target=f,args=(lock,n))
    t.start()

threading.Thread(target=h,args=(locks,)).start()
于 2011-11-12T11:52:13.223 回答
1

最简单的方法是使用 3 个 Python 进程。如果您在 Linux 上执行此操作,则每小时进程可以发送信号以导致其他任务暂停,或者您甚至可以杀死它们,然后在每小时任务完成后重新启动。不需要线程。

但是,如果您决定使用线程,则尝试在线程之间不共享任何数据,只需来回发送消息(也称为数据复制而不是数据共享)。线程很难正确处理。

但是,多个进程迫使您不共享任何内容,因此更容易正确执行。如果您使用像 0MQ http://www.zeromq.org这样的库来进行消息传递,那么很容易从线程模型转移到多进程模型。

于 2011-11-12T10:37:53.703 回答
1

使用通信进行同步:

#!/usr/bin/env python
import threading
import time
from Queue import Empty, Queue

def f(q, c):
    while True:
        try: q.get_nowait(); q.get() # get PAUSE signal      
        except Empty: pass  # no signal, do our thing
        else: q.get()       # block until RESUME signal
        print c,
        time.sleep(1)

def h(queues):
    while True:
        for q in queues:
            q.put_nowait(1); q.put(1) # block until PAUSE received
        print 'h'
        for q in queues:
            q.put(1) # put RESUME
        time.sleep(5)

queues = [Queue(1) for _ in range(2)]
threading.Thread(target=f, args=(queues[0], 'f')).start()
threading.Thread(target=f, args=(queues[1], 'g')).start()
threading.Thread(target=h, args=(queues,)).start()

从您的性能角度来看,它可能不是最佳的,但我发现它更容易遵循。

输出

f g
f g h
f g f g g f f g g f g f f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
于 2011-11-12T13:16:57.920 回答
0

信号量初始化为 2 怎么样?F 和 G 等待并发信号通知 1 个单元,H 等待并发信号通知 2 个单元。

于 2011-11-12T11:12:46.037 回答
0

这种方法怎么样(虽然有争议,因为我知道'全局'变量在线程方面被认为是一个很大的禁忌(新手-所以还在学习)......

import threading, time


import threading, time

def f():
    global BL
    while True:
        BL = 'f' if BL == -1 else BL
        if BL == 'f':
            print('f')
            BL = -1
            ss(0.1)

def g():
    global BL
    while True:
        BL = 'g' if BL == -1 else BL
        if BL == 'g':
            print('g')
            BL = -1
            ss(0.1)

def h():
    global BL
    while True:
        BL = 'h' if BL == -1 and (tt() - start) % delay_3rd <= 0.1 and (tt()-start) > 1 else BL
        if (BL == 'h'):
           print('h')
           print(f' seconds: {round(tt() - start,None)}!!'*100)
           BL = -1
           ss(0.1)


BL, delay_3rd, [ss], [tt]  = -1, 5, [time.sleep], [time.time]
start = tt()

第三个将每秒运行一次(您可以将 delay_3rd = 3600 设置为每小时间隔;而第一个两个始终运行(根据您的请求/意图)

threading.Thread(target=f).start()
threading.Thread(target=g).start()
threading.Thread(target=h).start()

(运行大约 4-5 秒后输出...)

F

h秒:5!!

G

F

G

F

F

G

F

H

G

F

G

h秒:6!!

F

G

F

G

F

G

F

G

F

G

F

G

F

H

秒:7!!

G

F

G

(注意 h 仅每秒出现一次;f 和 g 在整个过程中间歇性出现......)

于 2021-01-28T05:44:37.523 回答