85

How would you prompt the user for some input but timing out after N seconds?

Google is pointing to a mail thread about it at http://mail.python.org/pipermail/python-list/2006-January/533215.html but it seems not to work. The statement in which the timeout happens, no matter whether it is a sys.input.readline or timer.sleep(), I always get:

<type 'exceptions.TypeError'>: [raw_]input expected at most 1 arguments, got 2

which somehow the except fails to catch.

4

23 回答 23

111

使用 select 调用更短,并且应该更便携

import sys, select

print "You have ten seconds to answer!"

i, o, e = select.select( [sys.stdin], [], [], 10 )

if (i):
  print "You said", sys.stdin.readline().strip()
else:
  print "You said nothing!"
于 2010-05-25T11:18:11.990 回答
43

您链接到的示例是错误的,异常实际上是在调用警报处理程序时发生的,而不是在读取块时发生的。最好试试这个:

import signal
TIMEOUT = 5 # number of seconds your want for timeout

def interrupted(signum, frame):
    "called when read times out"
    print 'interrupted!'
signal.signal(signal.SIGALRM, interrupted)

def input():
    try:
            print 'You have 5 seconds to type in your stuff...'
            foo = raw_input()
            return foo
    except:
            # timeout
            return

# set alarm
signal.alarm(TIMEOUT)
s = input()
# disable the alarm after success
signal.alarm(0)
print 'You typed', s
于 2009-08-26T18:50:00.167 回答
14

不是 Python 解决方案,但是...

我在 CentOS (Linux) 下运行的脚本遇到了这个问题,对我的情况有用的只是在子进程中运行 Bash“read -t”命令。残酷的令人作呕的黑客,我知道,但我对它的效果感到内疚,我想与这里的每个人分享它。

import subprocess
subprocess.call('read -t 30', shell=True)

我所需要的只是等待 30 秒,除非按下 ENTER 键。这很好用。

于 2014-12-01T06:23:49.220 回答
11

如果你不在乎它是如何工作的,只要
pip install inputimeout

from inputimeout import inputimeout, TimeoutOccurred

if __name__ == "__main__":
    try:
        c = inputimeout(prompt='hello\n', timeout=3)
    except TimeoutOccurred:
        c = 'timeout'
    print(c)

如此简单
https://pypi.org/project/inputimeout/

于 2020-12-22T09:38:01.503 回答
10

这是一个适用于 Windows 的

我无法让这些示例中的任何一个在 Windows 上运行,因此我合并了一些不同的 StackOverflow 答案以获得以下结果:


import threading, msvcrt
import sys

def readInput(caption, default, timeout = 5):
    class KeyboardThread(threading.Thread):
        def run(self):
            self.timedout = False
            self.input = ''
            while True:
                if msvcrt.kbhit():
                    chr = msvcrt.getche()
                    if ord(chr) == 13:
                        break
                    elif ord(chr) >= 32:
                        self.input += chr
                if len(self.input) == 0 and self.timedout:
                    break    


    sys.stdout.write('%s(%s):'%(caption, default));
    result = default
    it = KeyboardThread()
    it.start()
    it.join(timeout)
    it.timedout = True
    if len(it.input) > 0:
        # wait for rest of input
        it.join()
        result = it.input
    print ''  # needed to move to next line
    return result

# and some examples of usage
ans = readInput('Please type a name', 'john') 
print 'The name is %s' % ans
ans = readInput('Please enter a number', 10 ) 
print 'The number is %s' % ans 
于 2010-10-12T03:25:58.620 回答
9

保罗的回答不太奏效。下面的修改代码对我有用

  • 视窗 7 x64

  • vanilla CMD shell(例如,不是git-bash 或其他非 M$ shell)

    -msvcrt它出现在 git-bash 中没有任何作用。

  • 蟒蛇3.6

(我发布了一个新答案,因为直接编辑 Paul 的答案会将它从 python 2.x-->3.x 更改,这对于编辑来说似乎太多了(py2 仍在使用中)

import sys, time, msvcrt

def readInput( caption, default, timeout = 5):

    start_time = time.time()
    sys.stdout.write('%s(%s):'%(caption, default))
    sys.stdout.flush()
    input = ''
    while True:
        if msvcrt.kbhit():
            byte_arr = msvcrt.getche()
            if ord(byte_arr) == 13: # enter_key
                break
            elif ord(byte_arr) >= 32: #space_char
                input += "".join(map(chr,byte_arr))
        if len(input) == 0 and (time.time() - start_time) > timeout:
            print("timing out, using default value.")
            break

    print('')  # needed to move to next line
    if len(input) > 0:
        return input
    else:
        return default

# and some examples of usage
ans = readInput('Please type a name', 'john') 
print( 'The name is %s' % ans)
ans = readInput('Please enter a number', 10 ) 
print( 'The number is %s' % ans) 
于 2017-01-23T17:33:40.557 回答
5

我在这上面花了大约 20 分钟,所以我认为值得一试把它放在这里。不过,它直接建立在 user137673 的答案之上。我发现做这样的事情最有用:

#! /usr/bin/env python

import signal

timeout = None

def main():
    inp = stdinWait("You have 5 seconds to type text and press <Enter>... ", "[no text]", 5, "Aw man! You ran out of time!!")
    if not timeout:
        print "You entered", inp
    else:
        print "You didn't enter anything because I'm on a tight schedule!"

def stdinWait(text, default, time, timeoutDisplay = None, **kwargs):
    signal.signal(signal.SIGALRM, interrupt)
    signal.alarm(time) # sets timeout
    global timeout
    try:
        inp = raw_input(text)
        signal.alarm(0)
        timeout = False
    except (KeyboardInterrupt):
        printInterrupt = kwargs.get("printInterrupt", True)
        if printInterrupt:
            print "Keyboard interrupt"
        timeout = True # Do this so you don't mistakenly get input when there is none
        inp = default
    except:
        timeout = True
        if not timeoutDisplay is None:
            print timeoutDisplay
        signal.alarm(0)
        inp = default
    return inp

def interrupt(signum, frame):
    raise Exception("")

if __name__ == "__main__":
    main()
于 2014-09-16T04:59:11.703 回答
5

以下代码对我有用。

我使用了两个线程,一个用于获取 raw_Input,另一个用于等待特定时间。如果任何线程退出,则两个线程都将终止并返回。

def _input(msg, q):
    ra = raw_input(msg)
    if ra:
        q.put(ra)
    else:
        q.put("None")
    return

def _slp(tm, q):
    time.sleep(tm)
    q.put("Timeout")
    return

def wait_for_input(msg="Press Enter to continue", time=10):
    q = Queue.Queue()
    th = threading.Thread(target=_input, args=(msg, q,))
    tt = threading.Thread(target=_slp, args=(time, q,))

    th.start()
    tt.start()
    ret = None
    while True:
        ret = q.get()
        if ret:
            th._Thread__stop()
            tt._Thread__stop()
            return ret
    return ret

print time.ctime()    
t= wait_for_input()
print "\nResponse :",t 
print time.ctime()
于 2017-03-23T12:22:52.300 回答
4

这是一个使用线程的可移植且简单的 Python 3 解决方案。这是跨平台时唯一对我有用的。

我尝试的其他事情都有问题:

  • 使用 signal.SIGALRM:不适用于 Windows
  • 使用选择调用:不适用于 Windows
  • 使用进程的强制终止(而不是线程):标准输入不能用于新进程(标准输入自动关闭)
  • 将标准输入重定向到 StringIO 并直接写入标准输入:如果 input() 已被调用,仍将写入先前的标准输入(请参阅https://stackoverflow.com/a/15055639/9624704
    from threading import Thread
    class myClass:
        _input = None

        def __init__(self):
            get_input_thread = Thread(target=self.get_input)
            get_input_thread.daemon = True  # Otherwise the thread won't be terminated when the main program terminates.
            get_input_thread.start()
            get_input_thread.join(timeout=20)

            if myClass._input is None:
                print("No input was given within 20 seconds")
            else:
                print("Input given was: {}".format(myClass._input))


        @classmethod
        def get_input(cls):
            cls._input = input("")
            return
于 2019-01-09T13:13:48.463 回答
3

我的跨平台解决方案

def input_process(stdin_fd, sq, str):
    sys.stdin = os.fdopen(stdin_fd)
    try:
        inp = input (str)
        sq.put (True)
    except:
        sq.put (False)

def input_in_time (str, max_time_sec):
    sq = multiprocessing.Queue()
    p = multiprocessing.Process(target=input_process, args=( sys.stdin.fileno(), sq, str))
    p.start()
    t = time.time()
    inp = False
    while True:
        if not sq.empty():
            inp = sq.get()
            break
        if time.time() - t > max_time_sec:
            break
    p.terminate()
    sys.stdin = os.fdopen( sys.stdin.fileno() )
    return inp
于 2019-04-30T03:45:38.273 回答
3

对于 Linux,我更喜欢select@Pontus 的版本。这里只有一个 python3 函数像read在 shell 中一样工作:

import sys, select

def timeout_input(prompt, timeout=3, default=""):
    print(prompt, end=': ', flush=True)
    inputs, outputs, errors = select.select([sys.stdin], [], [], timeout)
    print()
    return (0, sys.stdin.readline().strip()) if inputs else (-1, default)

In [29]: timeout_input("Continue? (Y/n)", 3, "y")                                                                                                                                                                  
Continue? (Y/n): 
Out[29]: (-1, 'y')

In [30]: timeout_input("Continue? (Y/n)", 3, "y")                                                                                                                                                                  
Continue? (Y/n): n

Out[30]: (0, 'n')

还有一个yes_or_no功能

In [33]: yes_or_no_3 = lambda prompt: 'n' not in timeout_input(prompt + "? (Y/n)", 3, default="y")[1].lower()                                                                                                      

In [34]: yes_or_no_3("Continue")                                                                                                                                                                                   
Continue? (Y/n): 
Out[34]: True

In [35]: yes_or_no_3("Continue")                                                                                                                                                                                   
Continue? (Y/n): no

Out[35]: False
于 2019-12-26T01:56:25.103 回答
3

您可以在 Python >= 3.4 中使用inputtimeout库。麻省理工学院许可证。

$ pip install inputimeout

from inputimeout import inputimeout, TimeoutOccurred
try:
    something = inputimeout(prompt='>>', timeout=5)
except TimeoutOccurred:
    something = 'something'
print(something)
于 2021-07-19T18:21:22.193 回答
2

类似于 Windows 的 Locane:

import subprocess  
subprocess.call('timeout /T 30')
于 2016-09-08T12:23:07.817 回答
1

我正在使用外部工具inputimeout。源代码可在github获得。我知道它是一个外部工具,但它很简单而且非常方便。安装工具后使用以下代码:

from inputimeout import inputimeout, TimeoutOccurred
try:
    something = inputimeout(prompt='>>', timeout=5)
except TimeoutOccurred:
    something = 'No input.'
print(something)
于 2021-02-05T14:30:00.547 回答
1

已经有好几年了,但是如果有人像我最近试图解决这类问题一样碰到这个问题,那么使用这个func-timeout包有一种简单快捷的方法来实现这一点。对于大多数 IDE,它必须在使用前安装;你可以通过安装它pip。上面的链接是不言自明的,但我将举例说明我是如何实现它的。

from func_timeout import FunctionTimedOut, func_timeout

try:
   ans = func_timeout(5, lambda: int(input('What is the sum of 2 and 3?\n')))
   print(ans)
except FunctionTimedOut:
   print(5)

func_timeout在其参数中返回方法的值,question()在这种情况下是函数。它还允许函数所需的其他参数(请参阅文档)。如果设置的时间过去了(这里是 5 秒),它会引发 aTimedOutException并运行except块中的代码。

于 2020-04-09T10:21:37.643 回答
1
from threading import Thread
import time


def get_input():
    while True:
        print(input('> '))


t1 = Thread(target=get_input)
t1.setDaemon(True)
t1.start()
time.sleep(3)
print('program exceeds')

好吧,只需简单地设置一个新的守护线程,并设置一个睡眠时间,无论你想要什么超时。我认为这很容易赶上 XD

于 2020-05-24T11:12:12.307 回答
1

这就是我处理这个问题的方式。我没有彻底测试它,我不确定它没有一些重要的问题,但考虑到其他解决方案也远非完美,我决定分享:

import sys
import subprocess


def switch():
    if len(sys.argv) == 1:
        main()
    elif sys.argv[1] == "inp":
        print(input(''))
    else:
        print("Wrong arguments:", sys.argv[1:])


def main():
    passw = input_timed('You have 10 seconds to enter password:', timeout=10)
    if passw is None:
        print("Time's out! You explode!")
    elif passw == "PasswordShmashword":
        print("H-h-how did you know you h-h-hacker")
    else:
        print("I spare your life because you at least tried")


def input_timed(*args, timeout, **kwargs):
    """
    Print a message and await user input - return None if timedout
    :param args: positional arguments passed to print()
    :param timeout: number of seconds to wait before returning None
    :param kwargs: keyword arguments passed to print()
    :return: user input or None if timed out
    """
    print(*args, **kwargs)
    try:
        out: bytes = subprocess.run(["python", sys.argv[0], "inp"], capture_output=True, timeout=timeout).stdout
    except subprocess.TimeoutExpired:
        return None
    return out.decode('utf8').splitlines()[0]


switch()
于 2019-12-11T13:23:24.710 回答
1

修改后的 iperov 答案对我有用(python3 win10 2019-12-09)

对 iperov 的更改:

  • 用 sstr 替换 str 因为 str 是 python 中的函数

  • 添加进口

  • 添加睡眠以降低 while 循环的 CPU 使用率(?)

  • add if name ==' main ': #windows 上的多处理需要

    导入系统、操作系统、多处理、时间

    def input_process(stdin_fd, sq, sstr):
        sys.stdin = os.fdopen(stdin_fd)
        try:
            inp = input(sstr)
            sq.put(True)
        except:
            sq.put(False)
    
    def input_in_time(sstr, max_time_sec):
        sq = multiprocessing.Queue()
        p = multiprocessing.Process(target=input_process, args=( sys.stdin.fileno(), sq, sstr))
        p.start()
        t = time.time()
        inp = False
        while True:
    
            if not sq.empty():
                inp = sq.get()
                break
            if time.time() - t > max_time_sec:
                break
    
            tleft=int( (t+max_time_sec)-time.time())
            if tleft<max_time_sec-1 and tleft>0:
                print('\n  ...time left '+str(tleft)+'s\ncommand:')
    
            time.sleep(2)
    
        p.terminate()
        sys.stdin = os.fdopen( sys.stdin.fileno() )
        return inp
    
    if __name__=='__main__':
        input_in_time("command:", 17)
    
于 2019-12-09T19:18:36.083 回答
0

这是一个仅使用的 Python 3.8+(尽管它可以适应 Python 3.6+)跨平台方法(因此不使用threadingmultiprocessing或调用 shell 实用程序)。它旨在从命令行运行脚本,不太适合动态使用。

input您可以按如下方式包装内置函数。在这种情况下,我将内置名称重新定义input为包装器,因为此实现需要input通过 this 路由所有调用。(免责声明:这就是为什么它可能不是一个好主意,只是一个不同的主意,为了好玩。)

import atexit
import builtins
import queue
import threading


def _make_input_func():
    prompt_queue = queue.Queue(maxsize=1)
    input_queue = queue.Queue(maxsize=1)

    def get_input():
        while (prompt := prompt_queue.get()) != GeneratorExit:
            inp = builtins.input(prompt)
            input_queue.put(inp)
            prompt_queue.task_done()

    input_thread = threading.Thread(target=get_input, daemon=True)

    last_call_timed_out = False

    def input_func(prompt=None, timeout=None):
        """Mimics :function:`builtins.input`, with an optional timeout

        :param prompt: string to pass to builtins.input
        :param timeout: how long to wait for input in seconds; None means indefinitely

        :return: the received input if not timed out, otherwise None
        """
        nonlocal last_call_timed_out

        if not last_call_timed_out:
            prompt_queue.put(prompt, block=False)
        else:
            print(prompt, end='', flush=True)

        try:
            result = input_queue.get(timeout=timeout)
            last_call_timed_out = False
            return result
        except queue.Empty:
            print(flush=True) # optional: end prompt line if no input received
            last_call_timed_out = True
            return None


    input_thread.start()
    return input_func


input = _make_input_func()
del _make_input_func

(我已经在 one-use-only 中定义了设置,以在其闭包中_make_input_func隐藏input的“静态”变量,以避免污染全局命名空间。)

这里的想法是创建一个单独的线程来处理对 的所有调用builtins.input,并使input包装器管理超时。由于调用builtins.inputalways 会阻塞直到有输入,当超时结束时,特殊线程仍在等待输入,但input包装器返回(带有None)。在下一次调用时,如果上一次调用超时,则不需要builtins.input再次调用(因为输入线程已经在等待输入),它只是打印提示,然后等待该线程返回一些输入,一如既往。

定义上述内容后,尝试运行以下脚本:

import time

if __name__ == '__main__':
    timeout = 2
    start_t = time.monotonic()
    if (inp := input(f"Enter something (you have {timeout} seconds): ", timeout)) is not None:
        print("Received some input:", repr(inp))
    else:
        end_t = time.monotonic()
        print(f"Timed out after {end_t - start_t} seconds")

    inp = input("Enter something else (I'll wait this time): ")
    print("Received some input:", repr(inp))
    
    input(f"Last chance to say something (you have {timeout} seconds): ", timeout)
于 2020-07-25T18:57:23.233 回答
0

受 iperov 的回答启发的解决方案希望更简洁:

import multiprocessing
import sys

def input_with_timeout(prompt, timeout=None):
    """Requests the user to enter a code at the command line."""
    queue = multiprocessing.Queue()
    process = multiprocessing.Process(
        _input_with_timeout_process, args=(sys.stdin.fileno(), queue, prompt),
    )
    process.start()
    try:
        process.join(timeout)
        if process.is_alive():
            raise ValueError("Timed out waiting for input.")
        return queue.get()
    finally:
        process.terminate()


def _input_with_timeout_process(stdin_file_descriptor, queue, prompt):
    sys.stdin = os.fdopen(stdin_file_descriptor)
    queue.put(input(prompt))
于 2020-03-05T01:46:58.007 回答
0

这是 linux 上的 python 3.8+ 的另一个版本,其中包含一个 yes_no 答案,默认返回超时

import signal
def alarm_handler(signum, frame):
    raise TimeoutError
def input_with_timeout(prompt, timeout=30):
    """ get input with timeout

    :param prompt: the prompt to print
    :param timeout: timeout in seconds, or None to disable

    :returns: the input
    :raises: TimeoutError if times out
    """
    # set signal handler
    if timeout is not None:
        signal.signal(signal.SIGALRM, alarm_handler)
        signal.alarm(timeout) # produce SIGALRM in `timeout` seconds
    try:
        return input(prompt)
    except TimeoutError as to:
        raise to
    finally:
        if timeout is not None:
            signal.alarm(0) # cancel alarm

def yes_or_no(question, default='y', timeout=None):
    """ Get y/n answer with default choice and optional timeout

    :param question: prompt
    :param default: the default choice, i.e. 'y' or 'n'
    :param timeout: the timeout in seconds, default is None

    :returns: True or False
    """
    if default is not None and (default!='y' and default!='n'):
        log.error(f'bad option for default: {default}')
        quit(1)
    y='Y' if default=='y' else 'y'
    n='N' if default=='n' else 'n'
    while "the answer is invalid":
        try:
            to_str='' if timeout is None else f'(Timeout {default} in {timeout}s)'
            reply = str(input_with_timeout(f'{question} {to_str} ({y}/{n}): ',timeout=timeout)).lower().strip()
        except TimeoutError:
            log.warning(f'timeout expired, returning default={default} answer')
            reply=''
        if len(reply)==0:
            return True if default=='y' else False
        elif reply[0] == 'y':
            return True
        if reply[0] == 'n':
            return False

代码中的使用示例


if yes_or_no(f'model {latest_model_folder} exists, start from it?', timeout=TIMEOUT):
     log.info(f'initializing model from {latest_model_folder}')
     model = load_model(latest_model_folder)
else:
     log.info('creating new empty model')
     model = create_model()
于 2020-11-03T19:06:21.407 回答
0

有些答案需要Enter在超时发生时按下键才能继续运行您的代码。其他的似乎很复杂,要开机,Enter超时后仍然需要按键。

在另一个线程中找到了答案,效果很好,但我发现了一个警告。我决定将我的代码放在一个class可移植的地方。

笔记

我不得不使用keyboard注入Enter按键,因为我input()的代码中有另一个语句。input()由于某种原因,除非我按下键,否则不会出现后续语句Enter

import threading
import keyboard    # https://github.com/boppreh/keyboard

class Utilities:

    # Class variable
    response = None

    @classmethod
    def user_input(cls, timeout):

        def question():
            cls.response = input("Enter something: ")

        t = threading.Thread(target=question)
        # Daemon property allows the target function to terminate after timeout
        t.daemon = True    
        t.start()
        t.join(timeout)

        if cls.response:
            # Do something
        else:
            # Do something else
            # Optional.  Use if you have other input() statements in your code
            keyboard.send("enter")

用法

Utilities.user_input(3)

这是在 Windows 10 上使用 Python 3.8.3 制作的。

于 2020-10-08T21:57:43.833 回答
-5

一个迟到的答案:)

我会做这样的事情:

from time import sleep

print('Please provide input in 20 seconds! (Hit Ctrl-C to start)')
try:
    for i in range(0,20):
        sleep(1) # could use a backward counter to be preeety :)
    print('No input is given.')
except KeyboardInterrupt:
    raw_input('Input x:')
    print('You, you! You know something.')

我知道这不一样,但许多现实生活中的问题都可以通过这种方式解决。(如果用户目前不在那里,当我希望某些东西继续运行时,我通常需要用户输入超时。)

希望这至少部分有所帮助。(如果有人读它的话:))

于 2012-03-14T15:50:21.353 回答