ChatGPT解决这个技术问题 Extra ChatGPT

我在 Python 中调用一个函数,我知道它可能会停止并迫使我重新启动脚本。

如何调用该函数或将其包装在什么中,以便如果它花费超过 5 秒的时间,脚本将取消它并执行其他操作?

此库看起来已维护:pypi.org/project/wrapt-timeout-decorator

s
sweden

如果您在 UNIX 上运行,则可以使用 signal 包:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

调用 signal.alarm(10) 10 秒后,将调用处理程序。这会引发一个异常,您可以从常规 Python 代码中截取该异常。

这个模块不能很好地使用线程(但是,谁呢?)

请注意,由于我们在超时发生时引发异常,它最终可能会在函数内部被捕获并忽略,例如一个这样的函数:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue

我使用 Python 2.5.4。有这样一个错误: Traceback (最近一次调用最后一次): File "aa.py", line 85, in func signal.signal(signal.SIGALRM, handler) AttributeError: 'module' object has no attribute 'SIGALRM'
@flypen 这是因为 signal.alarm 和相关的 SIGALRM 在 Windows 平台上不可用。
如果有很多进程,并且每个都调用 signal.signal --- 它们都可以正常工作吗?每个 signal.signal 调用不会取消“并发”一个吗?
我支持关于线程的警告。 signal.alarm 仅适用于主线程。我尝试在 Django 视图中使用它 - 立即失败,仅包含关于主线程的措辞。
如果您需要:将警报设置回 0 以取消它signal.alarm(0)(请参阅 stackoverflow.com/questions/27013127/…)。
E
Emil

您可以使用 multiprocessing.Process 来做到这一点。

代码

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate - may not work if process is stuck for good
        p.terminate()
        # OR Kill - will work for sure, no chance for process to finish nicely however
        # p.kill()

        p.join()

如何获取目标方法的返回值?
如果被调用的函数卡在 I/O 块上,这似乎不起作用。
@bad_keypoints 请参阅此答案:stackoverflow.com/a/10415215/1384471 基本上,您传递一个列表,将答案放入其中。
@sudo 然后删除 join()。这使您的 x 个并发子进程正在运行,直到它们完成工作,或在 join(10) 中定义的数量。如果您有 10 个进程的阻塞 I/O,使用 join(10) 您已将它们设置为等待所有已启动的每个进程最多等待 10 个进程。像这个例子 stackoverflow.com/a/27420072/2480481 那样使用守护进程标志。当然你可以将标志 daemon=True 直接传递给 multiprocessing.Process() 函数。
@ATOzTOA 这个解决方案的问题,至少就我的目的而言,是它可能不允许儿童自行清洁胎面。来自终止函数 terminate() ... Note that exit handlers and finally clauses, etc., will not be executed. Note that descendant processes of the process will not be terminated – they will simply become orphaned. 的文档
C
Community

如何调用该函数或将其包装在什么中,以便如果它花费的时间超过 5 秒,脚本会取消它?

我发布了一个 gist,它通过一个装饰器和一个 threading.Timer 解决了这个问题/问题。这是一个故障。

导入和设置以实现兼容性

它使用 Python 2 和 3 进行了测试。它也应该在 Unix/Linux 和 Windows 下工作。

首先是进口。无论 Python 版本如何,这些都试图保持代码一致:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

使用与版本无关的代码:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

现在我们已经从标准库中导入了我们的功能。

exit_after 装饰器

接下来我们需要一个函数来终止子线程中的 main()

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

这是装饰器本身:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

用法

这是直接回答您关于 5 秒后退出的问题的用法!:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

演示:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

第二个函数调用不会完成,而是进程应该退出并回溯!

KeyboardInterrupt 并不总是停止睡眠线程

请注意,在 Windows 上的 Python 2 上,键盘中断并不总是会中断睡眠,例如:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

除非明确检查 PyErr_CheckSignals(),否则它也不会中断在扩展中运行的代码,请参阅 Cython, Python and KeyboardInterrupt ignored

在任何情况下,我都会避免让线程休眠超过一秒 - 这是处理器时间的一个永恒。

如何调用该函数或将其包装在什么中,以便如果它花费超过 5 秒的时间,脚本将取消它并执行其他操作?

要捕获它并执行其他操作,您可以捕获 KeyboardInterrupt。

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

我还没有阅读您的整个帖子,但我只是想知道:如果flush为0怎么办?在下面的 if 语句中,这将被解释为 False,对吗?
为什么我必须调用thread.interrupt_main(),为什么我不能直接引发异常?
关于用这个包装 multiprocessing.connection.Client 有什么想法吗? - 试图解决:stackoverflow.com/questions/57817955/…
当我尝试不同的功能而不是倒计时时,它会挂在 thread.interrupt_main() 上。例如,我在计数中运行 subprocess(),即使计时器完成也没有终止,我必须按 ^C
如何只停止所有进程但不引发错误 KeyboardInterrupt?
A
Alex

我有一个不同的建议,它是一个纯函数(与线程建议具有相同的 API)并且似乎工作正常(基于此线程的建议)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

您还应该恢复原始信号处理程序。请参阅stackoverflow.com/questions/492519/…
还有一点需要注意:Unix 信号方法仅在您将其应用于主线程时才有效。在子线程中应用它会引发异常并且不起作用。
这不是最好的解决方案,因为它只适用于 linux。
Max,不正确 - 适用于任何符合 POSIX 的 unix。我认为您的评论应该更准确,不适用于 Windows。
您应该避免将 kwargs 设置为空字典。一个常见的 Python 陷阱是函数的默认参数是可变的。因此,该字典将在对 timeout 的所有调用中共享。最好将默认值设置为 None,并在函数的第一行添加 kwargs = kwargs or {}。 Args 没问题,因为元组是不可变的。
M
Matt Tardiff

在单元测试中搜索超时调用时,我遇到了这个线程。我在答案或 3rd 方包中没有找到任何简单的东西,所以我在下面编写了装饰器,您可以直接放入代码中:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

然后就像这样简单地使测试或您喜欢的任何功能超时:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

小心,因为这不会在达到超时后终止函数!
请注意,在 Windows 上,这会产生一个全新的进程 - 这将占用超时时间,如果依赖项需要很长时间来设置,可能会占用很多时间。
是的,这需要一些调整。它使线程永远存在。
IDK 如果这是最好的方法,但您可以在 func_wrapper 中尝试/捕获 Exception 并在捕获后执行 pool.close() 以确保线程无论如何都会在之后死亡。然后你可以抛出 TimeoutError 或任何你想要的。似乎对我有用。
这很有用,但是一旦我做了很多次,我就会得到 RuntimeError: can't start new thread。如果我忽略它,它是否仍然有效,或者我可以做些什么来解决这个问题?提前致谢!
e
egeland

在 pypi 上找到的 stopit 包似乎可以很好地处理超时。

我喜欢 @stopit.threading_timeoutable 装饰器,它向装饰函数添加了一个 timeout 参数,它可以满足您的期望,它会停止该函数。

在 pypi 上查看:https://pypi.python.org/pypi/stopit


库声称,某些功能在 Windows 中不起作用。
对于可能像我一样感到困惑的人:stopit.utils.TimeoutException 不会停止您的代码!此后代码继续正常!我在一个正常运行的程序中花了 30 分钟。真的很好的答案!
使用 stopit-1.1.2 时,基本的超时装饰器:@stopit.threading_timeoutable(default='not finished') 也适用于 Linux 和 Windows。如果您只想要一个简单的超时,那么简单而出色的解决方案。
I
Inaimathi

我是wrapt_timeout_decorator的作者

乍一看,这里介绍的大多数解决方案在 Linux 下都能正常工作——因为我们有 fork() 和 signals()——但在 Windows 上看起来有点不同。当涉及到 Linux 上的子线程时,你不能再使用信号了。

为了在 Windows 下生成一个进程,它需要是可挑选的——许多装饰函数或类方法不是。

所以你需要使用更好的pickler,比如dill和multiprocess(而不是pickle和multiprocessing)——这就是为什么你不能使用ProcessPoolExecutor(或者只有有限的功能)。

对于超时本身 - 您需要定义超时的含义 - 因为在 Windows 上生成进程需要相当长的时间(并且无法确定)。这对于短暂的超时可能会很棘手。让我们假设,产生这个过程大约需要 0.5 秒(很容易!!!)。如果你给 0.2 秒的超时应该发生什么?函数是否应该在 0.5 + 0.2 秒后超时(所以让方法运行 0.2 秒)?或者被调用的进程是否应该在 0.2 秒后超时(在这种情况下,装饰函数将始终超时,因为在那个时候它甚至没有产生)?

嵌套装饰器也可能很讨厌,您不能在子线程中使用信号。如果你想创建一个真正通用的、跨平台的装饰器,所有这些都需要考虑(和测试)。

其他问题是将异常传递回调用者,以及日志记录问题(如果在修饰函数中使用 - 不支持记录到另一个进程中的文件)

我试图涵盖所有边缘情况,您可能会查看包 wrapt_timeout_decorator,或者至少测试您自己的解决方案,灵感来自那里使用的单元测试。

@Alexis Eggermont - 不幸的是我没有足够的评论点 - 也许其他人可以通知你 - 我想我解决了你的导入问题。


这对我来说是救命稻草!我的问题有时是多处理工作人员无缘无故停止,并且在睡眠状态下消耗了大量内存和 cpu。尝试了各种用于多处理的包装器,这些包装器具有池超时选项,但每个都给我带来了其他不同的问题,例如在池终止后进程没有被杀死。现在有了这个装饰器,在很长一段时间后,函数就会被杀死,进程会在其中生成。它给了我突然关闭池的 BrokenPipeError,但它解决了我的主要问题。谢谢!有什么建议来处理 BrokenPipeError 吗?
@Arjun Sankarlal:当然,如果工人被杀,管道就会破裂。您需要在调度程序任务中捕获损坏的管道错误并正确清理。
是的,我明白了,我在 try/except 中使用了 BrokenPipeError 但没有被捕获。所以我在网络服务器中使用它。我发现了 BrokenPipeError 和一般异常。所以当超时发生时,我返回了一般异常而不是管道错误。但是几秒钟后,服务器在控制台中打印了 BrokenPipeError 并且它服务于其他请求而没有任何问题。可能是我在检查池是否损坏然后返回之后引入延迟!?
B
Brian

有很多建议,但没有一个使用 concurrent.futures,我认为这是处理这个问题的最清晰的方法。

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

超级简单的阅读和维护。

我们创建一个池,提交一个进程,然后等待最多 5 秒,然后引发 TimeoutError,您可以根据需要捕获和处理该错误。

原生于 python 3.2+ 并向后移植到 2.7(pip install futures)。

在线程和进程之间切换就像将 ProcessPoolExecutor 替换为 ThreadPoolExecutor 一样简单。

如果您想在超时时终止进程,我建议您查看 Pebble


“警告:如果超时,这不会终止功能”是什么意思?
@ScottStafford 进程/线程不会仅仅因为引发 TimeoutError 而结束。因此,进程或线程仍将尝试运行完成,并且不会在超时时自动将控制权交还给您。
这会让我保存当时的任何中间结果吗?例如,如果我有将超时设置为 5 的递归函数,并且在那段时间我有部分结果,我该如何编写函数以在超时时返回部分结果?
我正在使用这个,但是我有 1000 个任务,每个任务都允许在超时前 5 秒。我的问题是核心被阻塞在永远不会结束的任务上,因为超时仅适用于所有任务而不是单个任务。 concurrent.futures 没有为此 afaik 提供解决方案。
G
Gil

出色、易用且可靠的 PyPi 项目 timeout-decorator (https://pypi.org/project/timeout-decorator/)

安装:

pip install timeout-decorator

用法:

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()

我很欣赏明确的解决方案。但是谁能解释这个库是如何工作的,尤其是在处理多线程时。就我个人而言,我害怕使用未知的机制来处理线程或信号。
@wsysuper lib 有两种操作模式:打开新线程或新子进程(假设是线程安全的)
好像在linux下不能像其他基于signal.SIGALRM的解决方案一样
此解决方案不适用于 Python 3.7.6。我虽然你应该知道!这对我来说太糟糕了。
E
Enginer

在@piro 的基础上构建和增强答案,您可以构建一个上下文管理器。这允许非常易读的代码在成功运行后禁用警报信号(设置 signal.alarm(0))

from contextlib import contextmanager
import signal
import time

@contextmanager
def timeout(duration):
    def timeout_handler(signum, frame):
        raise Exception(f'block timedout after {duration} seconds')
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(duration)
    yield
    signal.alarm(0)

def sleeper(duration):
    time.sleep(duration)
    print('finished')

示例用法:

In [19]: with timeout(2):
    ...:     sleeper(1)
    ...:     
finished

In [20]: with timeout(2):
    ...:     sleeper(3)
    ...:         
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
      1 with timeout(2):
----> 2     sleeper(3)
      3 

<ipython-input-7-a75b966bf7ac> in sleeper(t)
      1 def sleeper(t):
----> 2     time.sleep(t)
      3     print('finished')
      4 

<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
      2 def timeout(duration):
      3     def timeout_handler(signum, frame):
----> 4         raise Exception(f'block timedout after {duration} seconds')
      5     signal.signal(signal.SIGALRM, timeout_handler)
      6     signal.alarm(duration)

Exception: block timedout after 2 seconds

这确实是一个很好的方法。只是为了完整起见,为此工作所需的导入:from contextlib import contextmanager
此上下文管理器的当前实现的一个问题是上下文内代码块中的异常可能导致信号警报未被禁用。要修复它,应添加 try + finally。类似于我下面的超时函数装饰器 (stackoverflow.com/a/66515961/1522304)
a
as - if

timeout-decorator 在 windows 系统上不起作用,因为 windows 对 signal 的支持不好。

如果你在 windows 系统中使用 timeout-decorator 你会得到以下

AttributeError: module 'signal' has no attribute 'SIGALRM'

有些人建议使用 use_signals=False 但对我不起作用。

作者@bitranox 创建了以下包:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

代码示例:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

给出以下异常:

TimeoutError: Function mytest timed out after 5 seconds

这听起来像是一个非常好的解决方案。奇怪的是,from wrapt_timeout_decorator import * 行似乎扼杀了我的一些其他导入。例如,我得到 ModuleNotFoundError: No module named 'google.appengine',但如果我不导入 wrapt_timeout_decorator,我不会收到此错误
@AlexisEggermont 我正打算将它与appengine 一起使用......所以我很好奇这个错误是否仍然存在?
对此进行测试时,似乎没有从 messageseconds passed 打印出任何内容
“代码示例”在我的 Windows 机器上运行良好。我对代码示例的第一次尝试没有成功,因为我错误地将文件命名为“signal.py”,并得到了这个错误“NameError:name 'timeout' is not defined”。当您将代码示例作为 py 文件运行时,将其命名为“my_signal.py”或“signal.py”以外的任何名称。
A
Alexander McFarlane

强调

引发 TimeoutError 使用异常来提醒超时 - 可以轻松修改

跨平台:Windows & Mac OS X

兼容性:Python 3.6+(我也在 python 2.7 上进行了测试,它可以通过小的语法调整工作)

有关平行地图的完整说明和扩展,请参见此处https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts

最小的例子

>>> @killer_call(timeout=4)
... def bar(x):
...        import time
...        time.sleep(x)
...        return x
>>> bar(10)
Traceback (most recent call last):
  ...
__main__.TimeoutError: function 'bar' timed out after 4s

正如预期的那样

>>> bar(2)
2

完整代码

import multiprocessing as mp
import multiprocessing.queues as mpq
import functools
import dill

from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any

class TimeoutError(Exception):

    def __init__(self, func: Callable, timeout: int):
        self.t = timeout
        self.fname = func.__name__

    def __str__(self):
            return f"function '{self.fname}' timed out after {self.t}s"


def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
    """lemmiwinks crawls into the unknown"""
    q.put(dill.loads(func)(*args, **kwargs))


def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
    """
    Single function call with a timeout

    Args:
        func: the function
        timeout: The timeout in seconds
    """

    if not isinstance(timeout, int):
        raise ValueError(f'timeout needs to be an int. Got: {timeout}')

    if func is None:
        return functools.partial(killer_call, timeout=timeout)

    @functools.wraps(killer_call)
    def _inners(*args, **kwargs) -> Any:
        q_worker = mp.Queue()
        proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
        proc.start()
        try:
            return q_worker.get(timeout=timeout)
        except mpq.Empty:
            raise TimeoutError(func, timeout)
        finally:
            try:
                proc.terminate()
            except:
                pass
    return _inners

if __name__ == '__main__':
    @killer_call(timeout=4)
    def bar(x):
        import time
        time.sleep(x)
        return x

    print(bar(2))
    bar(10)

笔记

由于 dill 的工作方式,您需要在函数内部导入。

这也意味着如果您的目标函数中有导入,这些函数可能与 doctest 不兼容。您将遇到 __import__ not found 的问题。


您的类 TimeoutError 正在屏蔽现有的内置异常 TimeoutError
A
A R

我们可以使用相同的信号。我认为下面的示例对您有用。与线程相比,它非常简单。

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

最好选择一个特定的异常并只捕获它。光秃秃的 try: ... except: ... 总是一个坏主意。
我同意你的观点。
虽然我理解原因,但作为系统管理员/集成商,我不同意 - python 代码因忽略错误处理而臭名昭著,而处理您期望的一件事对于质量软件来说还不够好。您可以处理您计划的 5 件事以及其他事情的通用策略。 “Traceback, None”不是一种策略,而是一种侮辱。
我一点都不懂你。如果我计划为特定功能做一些超时,如何以优雅的风格做呢?当调用函数依赖于不优雅的组件时,我必须计划什么策略?如何完美粘合这个?请用优雅的例子向我解释。
m
mdev

以防万一它对任何人都有帮助,基于@piro 的答案,我制作了一个函数装饰器:

import time
import signal
from functools import wraps


def timeout(timeout_secs: int):
    def wrapper(func):
        @wraps(func)
        def time_limited(*args, **kwargs):
            # Register an handler for the timeout
            def handler(signum, frame):
                raise Exception(f"Timeout for function '{func.__name__}'")

            # Register the signal function handler
            signal.signal(signal.SIGALRM, handler)

            # Define a timeout for your function
            signal.alarm(timeout_secs)

            result = None
            try:
                result = func(*args, **kwargs)
            except Exception as exc:
                raise exc
            finally:
                # disable the signal alarm
                signal.alarm(0)

            return result

        return time_limited

    return wrapper

在具有 20 seconds 超时的函数上使用包装器如下所示:

    @timeout(20)
    def my_slow_or_never_ending_function(name):
        while True:
            time.sleep(1)
            print(f"Yet another second passed {name}...")

    try:
        results = my_slow_or_never_ending_function("Yooo!")
    except Exception as e:
        print(f"ERROR: {e}")

它不适用于windwos 10。错误:模块“信号”没有属性“SIGALRM”
@AndyYuan - 这建立在 piro 的回答之上,它指出“信号”只能在 UNIX 上使用
r
raphaelauv

另一个使用 asyncio 的解决方案:

如果您想取消后台任务而不仅仅是在运行的主代码上超时,那么您需要来自主线程的显式通信以要求任务的代码取消,例如 threading.Event()

import asyncio
import functools
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor


class SingletonTimeOut:
    pool = None

    @classmethod
    def run(cls, to_run: functools.partial, timeout: float):
        pool = cls.get_pool()
        loop = cls.get_loop()
        try:
            task = loop.run_in_executor(pool, to_run)
            return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
        except asyncio.TimeoutError as e:
            error_type = type(e).__name__ #TODO
            raise e

    @classmethod
    def get_pool(cls):
        if cls.pool is None:
            cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
        return cls.pool

    @classmethod
    def get_loop(cls):
        try:
            return asyncio.get_event_loop()
        except RuntimeError:
            asyncio.set_event_loop(asyncio.new_event_loop())
            # print("NEW LOOP" + str(threading.current_thread().ident))
            return asyncio.get_event_loop()

# ---------------

TIME_OUT = float('0.2')  # seconds

def toto(input_items,nb_predictions):
    return 1

to_run = functools.partial(toto,
                           input_items=1,
                           nb_predictions="a")

results = SingletonTimeOut.run(to_run, TIME_OUT)


InternalError 未定义 - 可能值得填充该占位符
这不像您预期的那样工作:gist.github.com/coxley/5879f5ceecfbb4624bee23a6cef47510
docs.python.org/3/library/asyncio-task.html#timeouts 如果发生超时,它会尝试取消任务并引发 asyncio.TimeoutError。
H
Hal Canary
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

虽然此代码可能会回答问题,但提供有关它如何和/或为什么解决问题的额外上下文将提高答案的长期价值
我不认为这确实回答了这个问题,因为 subprocess.Popen(sys.argv[2:]) 将用于运行 a command 而不是 Python 函数调用。除非打算将其他 Python 脚本包装在这个脚本中,但这可能不会最容易从停顿中恢复。
J
James

我需要不会被 time.sleep 阻塞的 nestable 定时中断(SIGALARM 无法做到)(这是基于线程的方法无法做到的)。我最终从这里复制并稍微修改了代码:http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

代码本身:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

和一个用法示例:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'

这也使用信号,因此如果从线程调用将不起作用。
D
Dozy Sun

我遇到了同样的问题,但我的情况是需要在子线程上工作,信号对我不起作用,所以我写了一个 python 包: timeout-timer 来解决这个问题,支持用作上下文或装饰器,使用信号或子线程模块触发超时中断:

from timeout_timer import timeout, TimeoutInterrupt

class TimeoutInterruptNested(TimeoutInterrupt):
    pass

def test_timeout_nested_loop_both_timeout(timer="thread"):
    cnt = 0
    try:
        with timeout(5, timer=timer):
            try:
                with timeout(2, timer=timer, exception=TimeoutInterruptNested):
                    sleep(2)
            except TimeoutInterruptNested:
                cnt += 1
            time.sleep(10)
    except TimeoutInterrupt:
        cnt += 1
    assert cnt == 2

查看更多:https://github.com/dozysun/timeout-timer


线程定时器机制在子线程中工作正常,它将创建另一个子线程作为定时器,超时秒后子线程将调用父线程的停止,这将引发 TimeoutInterrupt 异常并在父线程中捕获
d
diemacht

这是对给定的基于线程的解决方案的轻微改进。

下面的代码支持异常:

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

以 5 秒超时调用它:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)

这将引发一个隐藏原始回溯的新异常。请参阅下面的我的版本...
这也是不安全的,好像在 runFunctionCatchExceptions() 中调用了某些获取 GIL 的 Python 函数。例如,如果在函数中调用,以下内容将永远不会或很长时间返回:eval(2**9999999999**9999999999)。请参阅stackoverflow.com/questions/22138190/…
T
Troels

这是一个 POSIX 版本,它结合了许多以前的答案以提供以下功能:

阻止执行的子进程。在类成员函数上使用超时函数。对终止时间的严格要求。

这是代码和一些测试用例:

import threading
import signal
import os
import time

class TerminateExecution(Exception):
    """
    Exception to indicate that execution has exceeded the preset running time.
    """


def quit_function(pid):
    # Killing all subprocesses
    os.setpgrp()
    os.killpg(0, signal.SIGTERM)

    # Killing the main thread
    os.kill(pid, signal.SIGTERM)


def handle_term(signum, frame):
    raise TerminateExecution()


def invoke_with_timeout(timeout, fn, *args, **kwargs):
    # Setting a sigterm handler and initiating a timer
    old_handler = signal.signal(signal.SIGTERM, handle_term)
    timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
    terminate = False

    # Executing the function
    timer.start()
    try:
        result = fn(*args, **kwargs)
    except TerminateExecution:
        terminate = True
    finally:
        # Restoring original handler and cancel timer
        signal.signal(signal.SIGTERM, old_handler)
        timer.cancel()

    if terminate:
        raise BaseException("xxx")

    return result

### Test cases
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        time.sleep(1)
    print('countdown finished')
    return 1337


def really_long_function():
    time.sleep(10)


def really_long_function2():
    os.system("sleep 787")


# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337

# Testing various scenarios
t1 = time.time()
try:
    print(invoke_with_timeout(1, countdown, 3))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)

t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function2))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)


t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)

# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)


class X:
    def __init__(self):
        self.value = 0

    def set(self, v):
        self.value = v


x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9

M
Martin Alexandersson

这是一个简单的例子,运行一个超时的方法,如果成功也检索它的值。

import multiprocessing
import time

ret = {"foo": False}


def worker(queue):
    """worker function"""

    ret = queue.get()

    time.sleep(1)

    ret["foo"] = True
    queue.put(ret)


if __name__ == "__main__":
    queue = multiprocessing.Queue()
    queue.put(ret)

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join(timeout=10)

    if p.exitcode is None:
        print("The worker timed out.")
    else:
        print(f"The worker completed and returned: {queue.get()}")