这可以通过将“信号唤醒 fd”设置到管道的写入端,然后创建一个CFFileDescriptor监视管道读取端的活动来实现。
如Python 2.6中的新增功能所述,在 Python 2.6 中,向名为set_wakeup_fd()signal的模块添加了一个新 API 。每当接收到信号时,都会将一个 NUL 字节 ( ) 写入 fd。'\0'
如果将唤醒 fd 设置为管道的写入端,则CFFileDescriptor可以创建 a 来监视管道读取端上的活动(数据可用性),并且可以将此类活动的回调配置为在CFRunLoop.
from __future__ import print_function
from AppKit import * # For development only. This takes a long time to complete as there are many symbols.
import fcntl
import os
import signal
shared_workspace = NSWorkspace.sharedWorkspace()
def on_did_activate_application(notification):
print('on_did_activate_application(notification = %s)' % notification)
notification_center = shared_workspace.notificationCenter()
did_activate_application_observer = notification_center.addObserverForName_object_queue_usingBlock_(
NSWorkspaceDidActivateApplicationNotification,
None,
None,
on_did_activate_application)
def handle_signal(signum, frame):
print('handle_signal(signum = %s, frame = <scrubbed>)' % signum)
if signum == signal.SIGCONT:
signal.signal(signal.SIGTSTP, handle_signal)
elif signum == signal.SIGINT:
notification_center.removeObserver_(did_activate_application_observer)
CFRunLoopStop(CFRunLoopGetCurrent())
else:
# https://stackoverflow.com/questions/13377773/proper-way-to-handle-signals-other-than-sigint-in-python
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum)
r, w = os.pipe()
flags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
fcntl.fcntl(r, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def callout(f, call_back_types, info):
# Note: The signal handler will run first.
print('callout()')
# Clear the pipe of NUL bytes.
n = 0
while True:
try:
n += len(os.read(r, 100))
except OSError:
break
print('read %d byte(s)' % n)
# Per https://developer.apple.com/documentation/corefoundation/cffiledescriptor?language=objc
# "Each call back is one-shot, and must be re-enabled if you want to get another one."
# Thus we need to re-enable call backs.
CFFileDescriptorEnableCallBacks(f, kCFFileDescriptorReadCallBack)
file_descriptor = CFFileDescriptorCreate(None, r, True, callout, None)
CFFileDescriptorEnableCallBacks(file_descriptor, kCFFileDescriptorReadCallBack)
run_loop_source = CFFileDescriptorCreateRunLoopSource(None, file_descriptor, 0)
CFRunLoopAddSource(CFRunLoopGetCurrent(), run_loop_source, kCFRunLoopDefaultMode)
signal.set_wakeup_fd(w)
signal.signal(signal.SIGCONT, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTSTP, handle_signal)
# For testing, configure a SIGALRM to be received every two seconds.
signal.signal(signal.SIGALRM, lambda _1, _2: print('SIGALRM'))
signal.setitimer(signal.ITIMER_REAL, 2, 2)
print('about to call CFRunLoopRun()')
CFRunLoopRun()
致谢
非常感谢“A GUEST”(无论您是谁)在 Pastebin 上发布此粘贴;感谢 Da_Blitz 撰写Fighting set_wakeup_fd文章;以及在 Python 中处理除 SIGINT 以外的信号的“正确”方式的提问者/回答者?, SIGSTOP 和 SIGTSTP 有什么区别?,以及Stack Overflow 上的 python 非阻塞读取文件。