0

我们正在运行退出一个大的 Python 代码来随机扫描一些物理模型的参数空间(所以,很难给出一个最小的例子,对不起)。评估一个参数点大约需要 300 毫秒,但有时(我不知道为什么)评估突然需要几个小时,这会耗尽我们在计算集群上的 CPU 预算。

因此,我的想法是使用线程来为参数点的每次评估提供最大的运行时间。如果评估需要更长的时间,那么我可以忽略这一点,因为它是不物理的。现在,这似乎不起作用。我在一个新线程中开始计算,将其连接到主线程,超时设置为 1 秒,但主线程仍然继续等待计算终止(这需要比 1 秒长得多的时间)。

这怎么可能?线程如何测量新线程已经运行的时间?不得不说,在评估一个参数点的过程中,我大量使用了 nlopt、numpy 和 scipy。正如我所假设的,其中大部分不是直接用 python 编写的,而是使用一些二进制文件来加速计算。这会影响线程吗(因为函数对它来说是“黑匣子”)?

谢谢!

4

1 回答 1

1

简短的回答:

我不认为threading.join检查超时。您必须检查它是否已超时。

无论哪种情况,要获得有效的解决方案,最少的代码片段都会有所帮助。这主要是一个猜测,但如果主进程没有检查超时,那么它将继续保持下去。

更长的答案:

让我们看看timeout参数的去向:

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L1060

self._wait_for_tstate_lock(timeout=max(timeout, 0))

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L1062-L1074

def _wait_for_tstate_lock(self, block=True, timeout=-1):
    # Issue #18808: wait for the thread state to be gone.
    # At the end of the thread's life, after all knowledge of the thread
    # is removed from C data structures, C code releases our _tstate_lock.
    # This method passes its arguments to _tstate_lock.acquire().
    # If the lock is acquired, the C code is done, and self._stop() is
    # called.  That sets ._is_stopped to True, and ._tstate_lock to None.
    lock = self._tstate_lock
    if lock is None:  # already determined that the C code is done
        assert self._is_stopped
    elif lock.acquire(block, timeout):
        lock.release()
        self._stop()

如果没有锁,请确保线程已停止。否则获取给定参数的锁blocktimeout

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L117

def acquire(self, blocking=True, timeout=-1):
    """Acquire a lock, blocking or non-blocking.
    When invoked without arguments: if this thread already owns the lock,
    increment the recursion level by one, and return immediately. Otherwise,
    if another thread owns the lock, block until the lock is unlocked. Once
    the lock is unlocked (not owned by any thread), then grab ownership, set
    the recursion level to one, and return. If more than one thread is
    blocked waiting until the lock is unlocked, only one at a time will be
    able to grab ownership of the lock. There is no return value in this
    case.
    When invoked with the blocking argument set to true, do the same thing
    as when called without arguments, and return true.
    When invoked with the blocking argument set to false, do not block. If a
    call without an argument would block, return false immediately;
    otherwise, do the same thing as when called without arguments, and
    return true.
    When invoked with the floating-point timeout argument set to a positive
    value, block for at most the number of seconds specified by timeout
    and as long as the lock cannot be acquired.  Return true if the lock has
    been acquired, false if the timeout has elapsed.
    """
    me = get_ident()
    if self._owner == me:
        self._count += 1
        return 1
    rc = self._block.acquire(blocking, timeout)
    if rc:
        self._owner = me
        self._count = 1
    return rc

获取锁获取线程标识。增加一个计数。

真的要开锁了。

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L98

self._block = _allocate_lock()

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L33

_allocate_lock = _thread.allocate_lock

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L4

import _thread

https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Modules/_threadmodule.c#L1300-L1301

static PyMethodDef thread_methods[] = {
    {"start_new_thread",        (PyCFunction)thread_PyThread_start_new_thread,
     METH_VARARGS, start_new_doc},
    {"start_new",               (PyCFunction)thread_PyThread_start_new_thread,
     METH_VARARGS, start_new_doc},
    {"allocate_lock",           (PyCFunction)thread_PyThread_allocate_lock,
     METH_NOARGS, allocate_doc},
    {"allocate",                (PyCFunction)thread_PyThread_allocate_lock,
     METH_NOARGS, allocate_doc},
    {"exit_thread",             (PyCFunction)thread_PyThread_exit_thread,
     METH_NOARGS, exit_doc},
    {"exit",                    (PyCFunction)thread_PyThread_exit_thread,
     METH_NOARGS, exit_doc},
    {"interrupt_main",          (PyCFunction)thread_PyThread_interrupt_main,
     METH_NOARGS, interrupt_doc},
    {"get_ident",               (PyCFunction)thread_get_ident,
     METH_NOARGS, get_ident_doc},
    {"_count",                  (PyCFunction)thread__count,
     METH_NOARGS, _count_doc},
    {"stack_size",              (PyCFunction)thread_stack_size,
     METH_VARARGS, stack_size_doc},
    {"_set_sentinel",           (PyCFunction)thread__set_sentinel,
     METH_NOARGS, _set_sentinel_doc},
    {NULL,                      NULL}           /* sentinel */
};

allocated_lock使用类型PyCFunction和名称定义方法thread_PyThread_allocate_lock

https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Modules/_threadmodule.c#L1128-L1131

static PyObject *
thread_PyThread_allocate_lock(PyObject *self)
{
    return (PyObject *) newlockobject();
}

https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Modules/_threadmodule.c#L538-L553

static lockobject *
newlockobject(void)
{
    lockobject *self;
    self = PyObject_New(lockobject, &Locktype);
    if (self == NULL)
        return NULL;
    self->lock_lock = PyThread_allocate_lock();
    self->locked = 0;
    self->in_weakreflist = NULL;
    if (self->lock_lock == NULL) {
        Py_DECREF(self);
        PyErr_SetString(ThreadError, "can't allocate lock");
        return NULL;
    }
    return self;
}

分配新的上下文并锁定

https://github.com/python/cpython/blob/2d264235f6e066611b412f7c2e1603866e0f7f1b/Python/thread_pthread.h#L276

PyThread_type_lock
PyThread_allocate_lock(void)
{
    sem_t *lock;
    int status, error = 0;

    dprintf(("PyThread_allocate_lock called\n"));
    if (!initialized)
        PyThread_init_thread();

    lock = (sem_t *)PyMem_RawMalloc(sizeof(sem_t));

    if (lock) {
        status = sem_init(lock,0,1);
        CHECK_STATUS("sem_init");

        if (error) {
            PyMem_RawFree((void *)lock);
            lock = NULL;
        }
    }

    dprintf(("PyThread_allocate_lock() -> %p\n", lock));
    return (PyThread_type_lock)lock;
}

https://github.com/python/cpython/blob/2d264235f6e066611b412f7c2e1603866e0f7f1b/Python/thread.c#L60-L77

void
PyThread_init_thread(void)
{
#ifdef Py_DEBUG
    char *p = Py_GETENV("PYTHONTHREADDEBUG");

    if (p) {
        if (*p)
            thread_debug = atoi(p);
        else
            thread_debug = 1;
    }
#endif /* Py_DEBUG */
    if (initialized)
        return;
    initialized = 1;
    dprintf(("PyThread_init_thread called\n"));
    PyThread__init_thread();
}

https://github.com/python/cpython/blob/2d264235f6e066611b412f7c2e1603866e0f7f1b/Python/thread_pthread.h#L170-L176

static void
PyThread__init_thread(void)
{
#if defined(_AIX) && defined(__GNUC__)
    extern void pthread_init(void);
    pthread_init();
#endif
}

https://github.com/python/cpython/blob/f243de2bc8d940316ce8da778ec02a7bbe594de1/configure.ac#L3416

AC_CHECK_FUNCS(alarm accept4 setitimer getitimer bind_textdomain_codeset chown \
 clock confstr ctermid dup3 execv faccessat fchmod fchmodat fchown fchownat \
 fexecve fdopendir fork fpathconf fstatat ftime ftruncate futimesat \
 futimens futimes gai_strerror getentropy \
 getgrouplist getgroups getlogin getloadavg getpeername getpgid getpid \
 getpriority getresuid getresgid getpwent getspnam getspent getsid getwd \
 if_nameindex \
 initgroups kill killpg lchmod lchown lockf linkat lstat lutimes mmap \
 memrchr mbrtowc mkdirat mkfifo \
 mkfifoat mknod mknodat mktime mremap nice openat pathconf pause pipe2 plock poll \
 posix_fallocate posix_fadvise pread \
 pthread_init pthread_kill putenv pwrite readlink readlinkat readv realpath renameat \
 select sem_open sem_timedwait sem_getvalue sem_unlink sendfile setegid seteuid \
 setgid sethostname \
 setlocale setregid setreuid setresuid setresgid setsid setpgid setpgrp setpriority setuid setvbuf \
 sched_get_priority_max sched_setaffinity sched_setscheduler sched_setparam \
 sched_rr_get_interval \
 sigaction sigaltstack siginterrupt sigpending sigrelse \
 sigtimedwait sigwait sigwaitinfo snprintf strftime strlcpy symlinkat sync \
 sysconf tcgetpgrp tcsetpgrp tempnam timegm times tmpfile tmpnam tmpnam_r \
 truncate uname unlinkat unsetenv utimensat utimes waitid waitpid wait3 wait4 \
 wcscoll wcsftime wcsxfrm wmemcmp writev _getpty)

http://man7.org/linux/man-pages/man7/pthreads.7.html

所有这一切都在问两件事: timeout afloat吗?你在检查isAlive吗?:

当 timeout 参数存在而不是 None 时,它​​应该是一个浮点数,以秒(或其分数)指定操作的超时。由于 join() 总是返回 None,因此您必须在 join() 之后调用 is_alive() 来确定是否发生超时——如果线程仍然处于活动状态,则 join() 调用超时。

于 2016-09-17T16:29:34.303 回答