是否有一种 Pythonic 方式可以只运行一个程序实例?
我想出的唯一合理的解决方案是尝试在某个端口上将其作为服务器运行,然后第二个程序尝试绑定到同一端口 - 失败。但这并不是一个好主意,也许还有比这更轻量级的东西?
(考虑到程序有时会失败,即段错误 - 所以像“锁定文件”这样的东西不会起作用)
是否有一种 Pythonic 方式可以只运行一个程序实例?
我想出的唯一合理的解决方案是尝试在某个端口上将其作为服务器运行,然后第二个程序尝试绑定到同一端口 - 失败。但这并不是一个好主意,也许还有比这更轻量级的东西?
(考虑到程序有时会失败,即段错误 - 所以像“锁定文件”这样的东西不会起作用)
以下代码应该可以完成这项工作,它是跨平台的并且在 Python 2.4-3.2 上运行。我在 Windows、OS X 和 Linux 上对其进行了测试。
from tendo import singleton
me = singleton.SingleInstance() # will sys.exit(-1) if other instance is running
最新的代码版本可用singleton.py。请在此处提交错误。
您可以使用以下方法之一安装tend:
easy_install tendo
pip install tendo
import fcntl
import os
import sys
def instance_already_running(label="default"):
"""
Detect if an an instance with the label is already running, globally
at the operating system level.
Using `os.open` ensures that the file pointer won't be closed
by Python's garbage collector after the function's scope is exited.
The lock will be released when the program exits, or could be
released if the file pointer were closed.
"""
lock_file_pointer = os.open(f"/tmp/instance_{label}.lock", os.O_WRONLY)
try:
fcntl.lockf(lock_file_pointer, fcntl.LOCK_EX | fcntl.LOCK_NB)
already_running = False
except IOError:
already_running = True
return already_running
很像 S.Lott 的建议,但带有代码。
此代码是特定于 Linux 的。它使用“抽象”的 UNIX 域套接字,但它很简单,不会留下陈旧的锁定文件。我更喜欢上面的解决方案,因为它不需要专门保留的 TCP 端口。
try:
import socket
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
## Create an abstract socket, by prefixing it with null.
s.bind( '\0postconnect_gateway_notify_lock')
except socket.error as e:
error_code = e.args[0]
error_string = e.args[1]
print "Process already running (%d:%s ). Exiting" % ( error_code, error_string)
sys.exit (0)
postconnect_gateway_notify_lock
可以更改唯一字符串以允许需要强制执行单个实例的多个程序。
我不知道它是否足够 Pythonic,但在 Java 世界中,在定义的端口上侦听是一种非常广泛使用的解决方案,因为它适用于所有主要平台,并且在程序崩溃时没有任何问题。
侦听端口的另一个优点是您可以向正在运行的实例发送命令。例如,当用户第二次启动程序时,您可以向正在运行的实例发送一个命令,告诉它打开另一个窗口(例如,Firefox 就是这样做的。我不知道他们是否使用 TCP 端口或命名管道或类似的东西,'虽然)。
以前从未写过python,但这是我刚刚在mycheckpoint中实现的,以防止它被crond启动两次或更多次:
import os
import sys
import fcntl
fh=0
def run_once():
global fh
fh=open(os.path.realpath(__file__),'r')
try:
fcntl.flock(fh,fcntl.LOCK_EX|fcntl.LOCK_NB)
except:
os._exit(0)
run_once()
在另一个问题 (http://stackoverflow.com/questions/2959474) 中发布后发现了 Slava-N 的建议。这被称为函数,锁定正在执行的脚本文件(不是 pid 文件)并保持锁定直到脚本结束(正常或错误)。
使用 pid 文件。你有一些已知的位置,“/path/to/pidfile”,并且在启动时你会做这样的事情(部分伪代码,因为我是咖啡前,不想那么努力地工作):
import os, os.path
pidfilePath = """/path/to/pidfile"""
if os.path.exists(pidfilePath):
pidfile = open(pidfilePath,"r")
pidString = pidfile.read()
if <pidString is equal to os.getpid()>:
# something is real weird
Sys.exit(BADCODE)
else:
<use ps or pidof to see if the process with pid pidString is still running>
if <process with pid == 'pidString' is still running>:
Sys.exit(ALREADAYRUNNING)
else:
# the previous server must have crashed
<log server had crashed>
<reopen pidfilePath for writing>
pidfile.write(os.getpid())
else:
<open pidfilePath for writing>
pidfile.write(os.getpid())
因此,换句话说,您正在检查 pidfile 是否存在;如果没有,请将您的 pid 写入该文件。如果 pidfile 确实存在,则检查 pid 是否是正在运行的进程的 pid;如果是这样,那么你有另一个正在运行的进程,所以只需关闭。如果不是,那么前一个进程崩溃了,所以记录它,然后将你自己的 pid 写入文件来代替旧的。然后继续。
在 Windows 上解决此问题的最佳解决方案是使用 @zgoda 建议的互斥锁。
import win32event
import win32api
from winerror import ERROR_ALREADY_EXISTS
mutex = win32event.CreateMutex(None, False, 'name')
last_error = win32api.GetLastError()
if last_error == ERROR_ALREADY_EXISTS:
print("App instance already running")
一些答案使用fctnl
(也包含在@sorin tendo 包中)在 Windows 上不可用,如果您尝试使用像pyinstaller
静态导入这样的包来冻结您的 python 应用程序,它会引发错误。
此外,使用锁定文件方法,会导致read-only
数据库文件出现问题(经历过这种情况sqlite3
)。
这可能有效。
尝试在已知位置创建 PID 文件。如果你失败了,有人锁定了文件,你就完成了。
正常完成后,关闭并删除 PID 文件,以便其他人可以覆盖它。
您可以将您的程序包装在一个 shell 脚本中,即使您的程序崩溃,该脚本也会删除 PID 文件。
如果程序挂起,您也可以使用 PID 文件终止程序。
这是我最终的仅限 Windows 的解决方案。将以下内容放入一个模块中,可能称为“onlyone.py”或其他。将该模块直接包含到您的 __ main __ python 脚本文件中。
import win32event, win32api, winerror, time, sys, os
main_path = os.path.abspath(sys.modules['__main__'].__file__).replace("\\", "/")
first = True
while True:
mutex = win32event.CreateMutex(None, False, main_path + "_{<paste YOUR GUID HERE>}")
if win32api.GetLastError() == 0:
break
win32api.CloseHandle(mutex)
if first:
print "Another instance of %s running, please wait for completion" % main_path
first = False
time.sleep(1)
该代码尝试创建一个互斥锁,其名称源自脚本的完整路径。我们使用正斜杠来避免与真实文件系统的潜在混淆。
在 unix 上使用锁定文件是一种非常常见的方法。如果它崩溃,您必须手动清理。您可以将 PID 存储在文件中,并在启动时检查是否存在具有此 PID 的进程,如果没有则覆盖锁定文件。(但是,您还需要锁定 read-file-check-pid-rewrite-file)。您将在os -package 中找到获取和检查 pid 所需的内容。检查是否存在具有给定 pid 的进程的常用方法是向其发送非致命信号。
其他替代方案可以将其与flock 或posix 信号量相结合。
正如 saua 建议的那样,打开一个网络套接字可能是最简单和最便携的。
对于将wxPython用于其应用程序的任何人,您都可以使用wx.SingleInstanceChecker
此处记录的函数 。
我个人使用一个子类,如果有一个现有的应用程序实例已经在执行wx.App
,则使用wx.SingleInstanceChecker
并返回:False
OnInit()
import wx
class SingleApp(wx.App):
"""
class that extends wx.App and only permits a single running instance.
"""
def OnInit(self):
"""
wx.App init function that returns False if the app is already running.
"""
self.name = "SingleApp-%s".format(wx.GetUserId())
self.instance = wx.SingleInstanceChecker(self.name)
if self.instance.IsAnotherRunning():
wx.MessageBox(
"An instance of the application is already running",
"Error",
wx.OK | wx.ICON_WARNING
)
return False
return True
这是一个简单的替代品,用于wx.App
禁止多个实例。要使用它,只需在您的代码中替换wx.App
为:SingleApp
app = SingleApp(redirect=False)
frame = wx.Frame(None, wx.ID_ANY, "Hello World")
frame.Show(True)
app.MainLoop()
我将其发布为答案,因为我是新用户,而 Stack Overflow 还不让我投票。
Sorin Sbarnea 的解决方案适用于我在 OS X、Linux 和 Windows 下,对此我深表感谢。
但是, tempfile.gettempdir() 在 OS X 和 Windows 下以一种方式运行,而在其他 some/many/all(?) *nixes 下以另一种方式运行(忽略 OS X 也是 Unix 的事实!)。区别对这段代码很重要。
OS X 和 Windows 具有特定于用户的临时目录,因此一个用户创建的临时文件对另一个用户不可见。相比之下,在许多版本的 *nix 下(我测试了 Ubuntu 9、RHEL 5、OpenSolaris 2008 和 FreeBSD 8),所有用户的临时目录都是 /tmp。
这意味着当在多用户计算机上创建锁定文件时,它会在 /tmp 中创建,并且只有第一次创建锁定文件的用户才能运行该应用程序。
一种可能的解决方案是将当前用户名嵌入到锁定文件的名称中。
值得注意的是,OP 的抓取端口的解决方案在多用户机器上也会出现异常。
我single_process
在我的gentoo上使用;
pip install single_process
示例:
from single_process import single_process
@single_process
def main():
print 1
if __name__ == "__main__":
main()
基于罗伯托·罗萨里奥的回答,我提出了以下功能:
SOCKET = None
def run_single_instance(uniq_name):
try:
import socket
global SOCKET
SOCKET = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
## Create an abstract socket, by prefixing it with null.
# this relies on a feature only in linux, when current process quits, the
# socket will be deleted.
SOCKET.bind('\0' + uniq_name)
return True
except socket.error as e:
return False
我们需要定义全局SOCKET
变量,因为它只会在整个进程退出时被垃圾收集。如果我们在函数中声明一个局部变量,函数退出后它会超出作用域,从而删除套接字。
所有的功劳都应该归功于 Roberto Rosario,因为我只澄清和详细说明了他的代码。并且此代码仅适用于 Linux,正如以下来自https://troydhanson.github.io/network/Unix_domain_sockets.html的引用文本所述:
Linux 有一个特殊功能:如果 UNIX 域套接字的路径名以空字节 \0 开头,则其名称不会映射到文件系统中。因此它不会与文件系统中的其他名称冲突。此外,当服务器关闭抽象命名空间中的 UNIX 域侦听套接字时,它的文件将被删除;使用常规的 UNIX 域套接字,文件在服务器关闭后仍然存在。
迟到的答案,但对于 Windows,您可以使用:
from win32event import CreateMutex
from win32api import CloseHandle, GetLastError
from winerror import ERROR_ALREADY_EXISTS
import sys
class singleinstance:
""" Limits application to single instance """
def __init__(self):
self.mutexname = "testmutex_{D0E858DF-985E-4907-B7FB-8D732C3FC3B9}"
self.mutex = CreateMutex(None, False, self.mutexname)
self.lasterror = GetLastError()
def alreadyrunning(self):
return (self.lasterror == ERROR_ALREADY_EXISTS)
def __del__(self):
if self.mutex:
CloseHandle(self.mutex)
用法
# do this at beginnig of your application
myapp = singleinstance()
# check is another instance of same program running
if myapp.alreadyrunning():
print ("Another instance of this program is already running")
sys.exit(1)
我一直怀疑应该有一个使用进程组的好的 POSIXy 解决方案,而不必访问文件系统,但我不能完全确定。就像是:
启动时,您的进程会向特定组中的所有进程发送“kill -0”。如果存在任何此类进程,则退出。然后它加入了这个组。没有其他进程使用该组。
但是,这有一个竞争条件 - 多个进程都可以同时执行此操作,并且最终都会加入组并同时运行。当您添加某种互斥锁以使其无懈可击时,您不再需要进程组。
如果您的进程仅由 cron 启动,每分钟或每小时一次,这可能是可以接受的,但是这让我有点紧张,因为它会在您不希望它发生的那一天出错。
我想这毕竟不是一个很好的解决方案,除非有人可以改进它?
上周我遇到了这个确切的问题,虽然我确实找到了一些好的解决方案,但我决定制作一个非常简单干净的 python 包并将其上传到 PyPI。它与tendo 的不同之处在于它可以锁定任何字符串资源名称。虽然你当然可以锁定__file__
来达到同样的效果。
安装:pip install quicklock
使用它非常简单:
[nate@Nates-MacBook-Pro-3 ~/live] python
Python 2.7.6 (default, Sep 9 2014, 15:04:36)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from quicklock import singleton
>>> # Let's create a lock so that only one instance of a script will run
...
>>> singleton('hello world')
>>>
>>> # Let's try to do that again, this should fail
...
>>> singleton('hello world')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/nate/live/gallery/env/lib/python2.7/site-packages/quicklock/quicklock.py", line 47, in singleton
raise RuntimeError('Resource <{}> is currently locked by <Process {}: "{}">'.format(resource, other_process.pid, other_process.name()))
RuntimeError: Resource <hello world> is currently locked by <Process 24801: "python">
>>>
>>> # But if we quit this process, we release the lock automatically
...
>>> ^D
[nate@Nates-MacBook-Pro-3 ~/live] python
Python 2.7.6 (default, Sep 9 2014, 15:04:36)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from quicklock import singleton
>>> singleton('hello world')
>>>
>>> # No exception was thrown, we own 'hello world'!
这是我使用 Python 3.7.9 在 Windows Server 2016 和 Ubuntu 20.04 上测试过的跨平台示例:
import os
class SingleInstanceChecker:
def __init__(self, id):
if isWin():
ensure_win32api()
self.mutexname = id
self.lock = win32event.CreateMutex(None, False, self.mutexname)
self.running = (win32api.GetLastError() == winerror.ERROR_ALREADY_EXISTS)
else:
ensure_fcntl()
self.lock = open(f"/tmp/isnstance_{id}.lock", 'wb')
try:
fcntl.lockf(self.lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.running = False
except IOError:
self.running = True
def already_running(self):
return self.running
def __del__(self):
if self.lock:
try:
if isWin():
win32api.CloseHandle(self.lock)
else:
os.close(self.lock)
except Exception as ex:
pass
# ---------------------------------------
# Utility Functions
# Dynamically load win32api on demand
# Install with: pip install pywin32
win32api=winerror=win32event=None
def ensure_win32api():
global win32api,winerror,win32event
if win32api is None:
import win32api
import winerror
import win32event
# Dynamically load fcntl on demand
# Install with: pip install fcntl
fcntl=None
def ensure_fcntl():
global fcntl
if fcntl is None:
import fcntl
def isWin():
return (os.name == 'nt')
# ---------------------------------------
这是它的使用:
import time, sys
def main(argv):
_timeout = 10
print("main() called. sleeping for %s seconds" % _timeout)
time.sleep(_timeout)
print("DONE")
if __name__ == '__main__':
SCR_NAME = "my_script"
sic = SingleInstanceChecker(SCR_NAME)
if sic.already_running():
print("An instance of {} is already running.".format(SCR_NAME))
sys.exit(1)
else:
main(sys.argv[1:])
linux例子
此方法基于创建一个临时文件,在您关闭应用程序后自动删除。程序启动我们验证文件的存在;如果文件存在(有待执行),则程序关闭;否则它会创建文件并继续执行程序。
from tempfile import *
import time
import os
import sys
f = NamedTemporaryFile( prefix='lock01_', delete=True) if not [f for f in os.listdir('/tmp') if f.find('lock01_')!=-1] else sys.exit()
YOUR CODE COMES HERE
在 Linux 系统上,还可以询问
pgrep -a
实例的数量,脚本位于进程列表中(选项 -a 显示完整的命令行字符串)。例如
import os
import sys
import subprocess
procOut = subprocess.check_output( "/bin/pgrep -u $UID -a python", shell=True,
executable="/bin/bash", universal_newlines=True)
if procOut.count( os.path.basename(__file__)) > 1 :
sys.exit( ("found another instance of >{}<, quitting."
).format( os.path.basename(__file__)))
-u $UID
如果限制应适用于所有用户,请删除。免责声明:a)假定脚本的(基本)名称是唯一的,b)可能存在竞争条件。
这是带有 contextmanager 和 memcached 的 django 的一个很好的例子: https ://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html
可用于保护不同主机上的同时操作。可用于管理多个任务。也可以更改为简单的 python 脚本。
我对上面代码的修改在这里:
import time
from contextlib import contextmanager
from django.core.cache import cache
@contextmanager
def memcache_lock(lock_key, lock_value, lock_expire):
timeout_at = time.monotonic() + lock_expire - 3
# cache.add fails if the key already exists
status = cache.add(lock_key, lock_value, lock_expire)
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if time.monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_key)
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
def main():
lock_name, lock_value = "lock_1", "locked"
with memcache_lock(lock_name, lock_value, LOCK_EXPIRE) as acquired:
if acquired:
# single instance code here:
pass
if __name__ == "__main__":
main()
这是一个跨平台的实现,使用上下文管理器创建一个临时锁定文件。
可用于管理多个任务。
import os
from contextlib import contextmanager
from time import sleep
class ExceptionTaskInProgress(Exception):
pass
# Context manager for suppressing exceptions
class SuppressException:
def __init__(self):
pass
def __enter__(self):
return self
def __exit__(self, *exc):
return True
# Context manager for task
class TaskSingleInstance:
def __init__(self, task_name, lock_path):
self.task_name = task_name
self.lock_path = lock_path
self.lock_filename = os.path.join(self.lock_path, self.task_name + ".lock")
if os.path.exists(self.lock_filename):
raise ExceptionTaskInProgress("Resource already in use")
def __enter__(self):
self.fl = open(self.lock_filename, "w")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.fl.close()
os.unlink(self.lock_filename)
# Here the task is silently interrupted
# if it is already running on another instance.
def main1():
task_name = "task1"
tmp_filename_path = "."
with SuppressException():
with TaskSingleInstance(task_name, tmp_filename_path):
print("The task `{}` has started.".format(task_name))
# The single task instance code is here.
sleep(5)
print("The task `{}` has completed.".format(task_name))
# Here the task is interrupted with a message
# if it is already running in another instance.
def main2():
task_name = "task1"
tmp_filename_path = "."
try:
with TaskSingleInstance(task_name, tmp_filename_path):
print("The task `{}` has started.".format(task_name))
# The single task instance code is here.
sleep(5)
print("Task `{}` completed.".format(task_name))
except ExceptionTaskInProgress as ex:
print("The task `{}` is already running.".format(task_name))
if __name__ == "__main__":
main1()
main2()
import sys,os
# start program
try: # (1)
os.unlink('lock') # (2)
fd=os.open("lock", os.O_CREAT|os.O_EXCL) # (3)
except:
try: fd=os.open("lock", os.O_CREAT|os.O_EXCL) # (4)
except:
print "Another Program running !.." # (5)
sys.exit()
# your program ...
# ...
# exit program
try: os.close(fd) # (6)
except: pass
try: os.unlink('lock')
except: pass
sys.exit()