ChatGPT解决这个技术问题 Extra ChatGPT

有没有办法杀死一个线程?

是否可以在不设置/检查任何标志/信号量/等的情况下终止正在运行的线程?


G
Grigory Zhadko

在 Python 和任何语言中,突然终止线程通常是一种不好的模式。考虑以下情况:

线程持有必须正确关闭的关键资源

该线程创建了几个其他线程,这些线程也必须被杀死。

如果您负担得起(如果您正在管理自己的线程),则处理此问题的好方法是有一个 exit_request 标志,每个线程都会定期检查它是否该退出。

例如:

import threading

class StoppableThread(threading.Thread):
    """Thread class with a stop() method. The thread itself has to check
    regularly for the stopped() condition."""

    def __init__(self,  *args, **kwargs):
        super(StoppableThread, self).__init__(*args, **kwargs)
        self._stop_event = threading.Event()

    def stop(self):
        self._stop_event.set()

    def stopped(self):
        return self._stop_event.is_set()

在此代码中,您应该在希望线程退出时调用 stop(),并使用 join() 等待线程正确退出。线程应定期检查停止标志。

但是,在某些情况下,您确实需要终止线程。一个例子是,当您包装一个忙于长时间调用的外部库时,您想中断它。

以下代码允许(有一些限制)在 Python 线程中引发异常:

def _async_raise(tid, exctype):
    '''Raises an exception in the threads with id tid'''
    if not inspect.isclass(exctype):
        raise TypeError("Only types can be raised (not instances)")
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
                                                     ctypes.py_object(exctype))
    if res == 0:
        raise ValueError("invalid thread id")
    elif res != 1:
        # "if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"
        ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
        raise SystemError("PyThreadState_SetAsyncExc failed")

class ThreadWithExc(threading.Thread):
    '''A thread class that supports raising an exception in the thread from
       another thread.
    '''
    def _get_my_tid(self):
        """determines this (self's) thread id

        CAREFUL: this function is executed in the context of the caller
        thread, to get the identity of the thread represented by this
        instance.
        """
        if not self.isAlive():
            raise threading.ThreadError("the thread is not active")

        # do we have it cached?
        if hasattr(self, "_thread_id"):
            return self._thread_id

        # no, look for it in the _active dict
        for tid, tobj in threading._active.items():
            if tobj is self:
                self._thread_id = tid
                return tid

        # TODO: in python 2.6, there's a simpler way to do: self.ident

        raise AssertionError("could not determine the thread's id")

    def raiseExc(self, exctype):
        """Raises the given exception type in the context of this thread.

        If the thread is busy in a system call (time.sleep(),
        socket.accept(), ...), the exception is simply ignored.

        If you are sure that your exception should terminate the thread,
        one way to ensure that it works is:

            t = ThreadWithExc( ... )
            ...
            t.raiseExc( SomeException )
            while t.isAlive():
                time.sleep( 0.1 )
                t.raiseExc( SomeException )

        If the exception is to be caught by the thread, you need a way to
        check that your thread has caught it.

        CAREFUL: this function is executed in the context of the
        caller thread, to raise an exception in the context of the
        thread represented by this instance.
        """
        _async_raise( self._get_my_tid(), exctype )

(基于 Tomer Filiba 的 Killable Threads。关于 PyThreadState_SetAsyncExc 的返回值的引用似乎来自 old version of Python。)

如文档中所述,这不是灵丹妙药,因为如果线程在 Python 解释器之外忙,它就不会捕获中断。

此代码的一个良好使用模式是让线程捕获特定异常并执行清理。这样,您可以中断任务并仍然进行适当的清理。


@Bluebird75:此外,我不确定我是否得到了线程不应该被突然终止的论点“因为线程可能持有必须正确关闭的关键资源”:主程序和主程序也是如此可能会被用户突然杀死(例如 Unix 中的 Ctrl-C)——在这种情况下,他们会尝试尽可能好地处理这种可能性。所以,我看不出线程有什么特别之处,以及为什么它们不应该得到与主程序相同的处理(即它们可以被突然终止)。 :) 你能详细说明一下吗?
@EOL:另一方面,如果线程拥有的所有资源都是本地资源(打开的文件、套接字),那么 Linux 在进程清理方面相当出色,并且不会泄漏。虽然我有使用套接字创建服务器的情况,但如果我使用 Ctrl-C 进行粗暴中断,我将无法再启动该程序,因为它无法绑定套接字。我需要等5分钟。正确的解决方案是抓住 Ctrl-C 并进行干净的套接字断开连接。
@Bluebird75:顺便说一句。您可以使用 SO_REUSEADDR 套接字选项来避免 Address already in use 错误。
请注意这个答案:至少对我来说(py2.6),对于 res != 1 情况,我必须传递 None 而不是 0,并且我必须调用 ctypes.c_long(tid) 并将其传递给任何 ctypes 函数,而不是比直接tid。
值得一提的是,_stop 已经被 Python 3 线程库占用了。因此,也许使用不同的变量,否则你会得到一个错误。
k
kolypto

multiprocessing.Process 可以p.terminate()

在我想杀死一个线程但不想使用标志/锁/信号/信号量/事件/什么的情况下,我将线程提升为完整的进程。对于只使用几个线程的代码,开销并没有那么糟糕。

例如,这可以方便地终止执行阻塞 I/O 的辅助“线程”

转换很简单:在相关代码中,将所有 threading.Thread 替换为 multiprocessing.Process,将所有 queue.Queue 替换为 multiprocessing.Queue,并将所需的 p.terminate() 调用添加到想要杀死其子进程 p 的父进程

请参阅 Python documentation for multiprocessing

例子:

import multiprocessing
proc = multiprocessing.Process(target=your_proc_function, args=())
proc.start()
# Terminate the process
proc.terminate()  # sends a SIGTERM

谢谢。我用 multiprocessing.JoinableQueue 替换了 queue.Queue 并遵循了这个答案:stackoverflow.com/a/11984760/911207
multiprocessing 很好,但请注意,参数已被腌制到新流程中。因此,如果其中一个参数是不可选择的(例如 logging.log),则使用 multiprocessing 可能不是一个好主意。
multiprocessing 参数被挑选到 Windows 上的新进程中,但 Linux 使用分叉来复制它们(Python 3.7,不确定还有哪些其他版本)。所以你最终会得到在 Linux 上运行但在 Windows 上引发 pickle 错误的代码。
multiprocessing 记录是一件棘手的事情。需要使用QueueHandler(参见this tutorial)。我很难学会。
可惜我无法监视在多处理中运行的函数...谢谢
M
Martin v. Löwis

没有官方的 API 可以做到这一点,没有。

您需要使用平台 API 来终止线程,例如 pthread_kill 或 TerminateThread。您可以通过 pythonwin 或 ctypes 访问此类 API。

请注意,这本质上是不安全的。如果被杀死的线程在被杀死时具有 GIL,它可能会导致无法收集的垃圾(来自成为垃圾的堆栈帧的局部变量),并且可能导致死锁。


如果有问题的线程持有 GIL,它将导致死锁。
s
schettino72

如果您试图终止整个程序,您可以将线程设置为“守护进程”。见Thread.daemon


这没有任何意义。文档明确指出,“必须在调用 start() 之前设置,否则引发 RuntimeError。”因此,如果我想杀死一个原本不是守护进程的线程,我该如何使用它?
Raffi 我认为他建议你提前设置它,因为你知道当你的主线程退出时,你也希望守护线程退出。
如果您希望线程继续运行,即使您的主程序关闭,您不会将线程设置为守护进程吗?
@MichelePiccolini:恰恰相反:守护线程不会在其他进程消失时保持进程运行。
这对我来说是最好的答案,我只想在父进程关闭时清理线程。谢谢!
J
Jon Coombs

正如其他人所提到的,规范是设置一个停止标志。对于轻量级的东西(没有 Thread 的子类化,没有全局变量),lambda 回调是一种选择。 (注意 if stop() 中的括号。)

import threading
import time

def do_work(id, stop):
    print("I am thread", id)
    while True:
        print("I am thread {} doing something".format(id))
        if stop():
            print("  Exiting loop.")
            break
    print("Thread {}, signing off".format(id))


def main():
    stop_threads = False
    workers = []
    for id in range(0,3):
        tmp = threading.Thread(target=do_work, args=(id, lambda: stop_threads))
        workers.append(tmp)
        tmp.start()
    time.sleep(3)
    print('main: done sleeping; time to stop the threads.')
    stop_threads = True
    for worker in workers:
        worker.join()
    print('Finis.')

if __name__ == '__main__':
    main()

print() 替换为始终刷新 (sys.stdout.flush()) 的 pr() 函数可能会提高 shell 输出的精度。

(仅在 Windows/Eclipse/Python3.3 上测试过)


在 Linux / Python 2.7 上经过验证,就像一个魅力。这应该是官方的回答,简单多了。
在 Linux Ubuntu Server 17.10/Python 3.6.3 上验证并运行它。
pr() 函数是什么?
@alper 您创建了一个新函数,其作用类似于 print 函数,但flush输出并调用它 pr
v
vallentin

在 Python 中,您根本无法直接杀死线程。

如果您真的不需要线程 (!),您可以使用 multiprocessing package 而不是使用 threading package 。在这里,要杀死一个进程,您可以简单地调用该方法:

yourProcess.terminate()  # kill the process!

Python 将终止您的进程(在 Unix 上通过 SIGTERM 信号,而在 Windows 上通过 TerminateProcess() 调用)。注意在使用队列或管道时使用它! (它可能会破坏队列/管道中的数据)

请注意,multiprocessing.Eventmultiprocessing.Semaphore 的工作方式分别与 threading.Eventthreading.Semaphore 完全相同。事实上,第一个是后者的克隆。

如果你真的需要使用一个线程,没有办法直接杀死它。但是,您可以做的是使用“守护线程”。事实上,在 Python 中,可以将 Thread 标记为守护进程:

yourThread.daemon = True  # set the Thread as a "daemon thread"

当没有活着的非守护线程时,主程序将退出。换句话说,当你的主线程(当然是一个非守护线程)完成它的操作时,即使还有一些守护线程在工作,程序也会退出。

请注意,必须在调用 start() 方法之前将 Thread 设置为 daemon

当然,即使与 multiprocessing 一起使用,您也可以而且应该使用 daemon。在这里,当主进程退出时,它会尝试终止其所有守护子进程。

最后,请注意 sys.exit()os.kill() 不是选择。


我不知道为什么人们不投票。这个答案有什么问题?不过这个对我有用。
@fsevenm:进程与线程相同。它们在单独的内存空间中运行,因此不容易共享全局变量。传递参数包括腌制它们并在另一边解开它们。这加上启动和运行单独进程的开销涉及更多其他开销,这些开销只是简单地切换线程所做的。在很多方面都是苹果和橙子,所以这可能就是为什么——回答你的问题。
@martineau 我从未说过它们是相同的东西。我实际上是从“如果你真的不需要线程”开始的,因为情况并非总是如此,然后继续“如果你真的需要使用线程”......
@PaoloRovelli:在我评论的第一部分,我的意思是写“进程与线程不同”。
m
martineau

这基于 thread2 -- killable threads ActiveState 配方。

您需要调用 PyThreadState_SetAsyncExc(),它只能通过 ctypes 模块使用。

这仅在 Python 2.7.3 上进行了测试,但它很可能适用于其他最近的 2.x 版本。 PyThreadState_SetAsyncExc() 仍然存在于 Python 3 中以实现向后兼容性(但我尚未对其进行测试)。

import ctypes

def terminate_thread(thread):
    """Terminates a python thread from another thread.

    :param thread: a threading.Thread instance
    """
    if not thread.isAlive():
        return

    exc = ctypes.py_object(SystemExit)
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
        ctypes.c_long(thread.ident), exc)
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        # """if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
        raise SystemError("PyThreadState_SetAsyncExc failed")

我正在使用这样的东西给我的线程一个KeyboardInterrupt,以便他们有机会清理。如果在那之后它们仍然挂起,那么 SystemExit 是合适的,或者只是从终端中终止进程。
如果线程当前正在执行,则此方法有效。如果线程在系统调用中,它就不起作用;异常将被静默忽略。
@JohanDahlin您可以稍等片刻(如果您想重试,无论如何都需要这样做),然后进行 isAlive() 测试。无论如何,虽然这可行,但我也不保证它不会留下悬空的引用。虽然理论上可以在 CPython 中使线程终止安全,但通过明智地使用 pthread_cleanup_push()/_pop(),正确实现需要大量工作,并且会显着减慢解释器的速度。
L
Lasse V. Karlsen

你不应该在不合作的情况下强行杀死一个线程。

杀死一个线程会删除任何尝试/最终阻止设置的保证,因此您可能会锁定锁、打开文件等。

您唯一可以争辩说强制终止线程是一个好主意是快速终止程序,但绝不是单个线程。


为什么只告诉一个线程这么难,当你完成当前循环时请自杀......我不明白。
cpu 中没有内置机制来识别“循环”,您可以希望的最好的方法是使用某种信号,当前循环内的代码将在它退出后进行检查。处理线程同步的正确方法是通过协作方式,线程的暂停、恢复和终止是针对调试器和操作系统的函数,而不是应用程序代码。
@Mehdi:如果我(个人)在线程中编写代码,是的,我同意你的看法。但是在某些情况下,我正在运行第三方库,而我无权访问该代码的执行循环。这是请求功能的一个用例。
@DanH 使用第三方代码更糟糕,因为您不知道它会造成什么损害。如果您的第三方库不够健壮以至于需要被杀死,那么您应该执行以下操作之一:(1)要求作者解决问题,(2)使用其他东西。如果您真的别无选择,那么将该代码放在不同的进程中应该更安全,因为某些资源仅在单个进程中共享。
如果我在应用程序中有连接线程并且我想关闭它。它是一个守护进程。我怎么能关闭它呢?我没有关闭应用程序,我只需要取消连接。
S
SCB

如果您明确调用 time.sleep() 作为线程的一部分(例如轮询某些外部服务),对 Phillipe 方法的改进是在 sleep() 的任何位置使用 eventwait() 方法中的超时

例如:

import threading

class KillableThread(threading.Thread):
    def __init__(self, sleep_interval=1):
        super().__init__()
        self._kill = threading.Event()
        self._interval = sleep_interval

    def run(self):
        while True:
            print("Do Something")

            # If no kill signal is set, sleep for the interval,
            # If kill signal comes in while sleeping, immediately
            #  wake up and handle
            is_killed = self._kill.wait(self._interval)
            if is_killed:
                break

        print("Killing Thread")

    def kill(self):
        self._kill.set()

然后运行它

t = KillableThread(sleep_interval=5)
t.start()
# Every 5 seconds it prints:
#: Do Something
t.kill()
#: Killing Thread

使用 wait() 而不是 sleep() 并定期检查事件的优点是您可以在更长的睡眠时间间隔内进行编程,线程几乎立即停止(否则您会 sleep())并且在我看来,处理退出的代码要简单得多。


为什么这篇文章被否决了?这篇文章有什么问题?它看起来完全像我需要的......
虽然这篇文章不是我需要的(我需要安全地从孩子中断父母),但我肯定在我的代码的其他部分使用了 time.sleep 并使轮询间隔变小,以便我的脚本反应更快,但是这个解决方案具有制作小的轮询间隔而没有任何缺点(浪费计算)的所有好处。 +1 非常感谢。
J
Jeff

您可以通过将跟踪安装到将退出线程的线程中来终止线程。有关一种可能的实现,请参阅附加链接。

Kill a thread in Python


这里为数不多的真正有效的答案之一
此解决方案存在两个问题:(a) 使用 sys.settrace() 安装跟踪器会使您的线程运行得更慢。如果受计算限制,速度会慢 10 倍。 (b) 在系统调用中不会影响您的线程。
链接配方的另一个问题是它覆盖了 start() 方法,而 current documentation 明确声明“换句话说,覆盖该类的 __init__()run() 方法” (定义子类时)。
G
Giancarlo

最好不要杀死线程。一种方法是在线程的循环中引入一个“try”块,并在您想要停止线程时抛出异常(例如,中断/返回/...停止您的 for/while/...)。我已经在我的应用程序上使用了它并且它有效......


N
Noctis Skytower

绝对可以实现 Thread.stop 方法,如以下示例代码所示:

import sys
import threading
import time


class StopThread(StopIteration):
    pass

threading.SystemExit = SystemExit, StopThread


class Thread2(threading.Thread):

    def stop(self):
        self.__stop = True

    def _bootstrap(self):
        if threading._trace_hook is not None:
            raise ValueError('Cannot run thread with tracing!')
        self.__stop = False
        sys.settrace(self.__trace)
        super()._bootstrap()

    def __trace(self, frame, event, arg):
        if self.__stop:
            raise StopThread()
        return self.__trace


class Thread3(threading.Thread):

    def _bootstrap(self, stop_thread=False):
        def stop():
            nonlocal stop_thread
            stop_thread = True
        self.stop = stop

        def tracer(*_):
            if stop_thread:
                raise StopThread()
            return tracer
        sys.settrace(tracer)
        super()._bootstrap()

###############################################################################


def main():
    test1 = Thread2(target=printer)
    test1.start()
    time.sleep(1)
    test1.stop()
    test1.join()
    test2 = Thread2(target=speed_test)
    test2.start()
    time.sleep(1)
    test2.stop()
    test2.join()
    test3 = Thread3(target=speed_test)
    test3.start()
    time.sleep(1)
    test3.stop()
    test3.join()


def printer():
    while True:
        print(time.time() % 1)
        time.sleep(0.1)


def speed_test(count=0):
    try:
        while True:
            count += 1
    except StopThread:
        print('Count =', count)

if __name__ == '__main__':
    main()

Thread3 类运行代码的速度似乎比 Thread2 类快大约 33%。


这是一种聪明的方法,可以将检查的 self.__stop 注入到线程中。请注意,与此处的大多数其他解决方案一样,它实际上不会中断阻塞调用,因为仅在进入新的本地范围时才调用跟踪函数。另外值得注意的是,sys.settrace 真正用于实现调试器、配置文件等,因此被认为是 CPython 的实现细节,不保证存在于其他 Python 实现中。
@dano:Thread2 类的最大问题之一是它运行代码的速度大约慢了十倍。有些人可能仍然认为这是可以接受的。
+1 会大大减慢代码执行速度。我建议此解决方案的作者在答案中包含此信息。
s
slumtrimpet

我玩这个游戏已经很晚了,但我一直在与 a similar question 搏斗,以下似乎完美地解决了我的问题,并让我在守护进程子线程退出时进行一些基本的线程状态检查和清理:

import threading
import time
import atexit

def do_work():

  i = 0
  @atexit.register
  def goodbye():
    print ("'CLEANLY' kill sub-thread with value: %s [THREAD: %s]" %
           (i, threading.currentThread().ident))

  while True:
    print i
    i += 1
    time.sleep(1)

t = threading.Thread(target=do_work)
t.daemon = True
t.start()

def after_timeout():
  print "KILL MAIN THREAD: %s" % threading.currentThread().ident
  raise SystemExit

threading.Timer(2, after_timeout).start()

产量:

0
1
KILL MAIN THREAD: 140013208254208
'CLEANLY' kill sub-thread with value: 2 [THREAD: 140013674317568]

为什么在 after_timeout 线程上引发 SystemExit 会对主线程做任何事情(在本例中它只是等待前者退出)?
@DavisHerring 我不确定你在说什么。 SystemExit 杀死了主线程,你为什么认为它不会在主线程上做任何事情?如果没有该调用,程序将继续等待子线程。您也可以 ctrl+c 或使用任何其他方式终止主线程,但这是一个示例。
@slumtrimpet:SystemExit 只有两个特殊属性:它不会产生回溯(当任何线程通过抛出一个退出时),如果 main 线程通过抛出一个退出它会设置退出状态(同时等待其他非守护线程退出)。
-1 主线程继续工作并且没有被子线程引发的 SystemExit 中断。必须使用 kill -9 从终端终止脚本
A
Amit Chahar

以下解决方法可用于终止线程:

kill_threads = False

def doSomething():
    global kill_threads
    while True:
        if kill_threads:
            thread.exit()
        ......
        ......

thread.start_new_thread(doSomething, ())

这甚至可以用于从主线程终止代码在另一个模块中编写的线程。我们可以在该模块中声明一个全局变量,并使用它来终止在该模块中产生的线程。

我通常使用它来终止程序退出时的所有线程。这可能不是终止线程的完美方式,但可能会有所帮助。


'Thread' object has no attribute 'exit' 在 Python 3.6+ 中
而不是线程退出,只需打破循环,您将退出线程
v
vallentin
from ctypes import *
pthread = cdll.LoadLibrary("libpthread-2.15.so")
pthread.pthread_cancel(c_ulong(t.ident))

t 是您的 Thread 对象。

阅读 python 源代码(Modules/threadmodule.cPython/thread_pthread.h),您可以看到 Thread.identpthread_t 类型,因此您可以在 python 使用 libpthread 中执行 pthread 可以执行的任何操作。


你没有;不在 Windows 上,也不在 Linux 上。原因:有问题的线程可能会在您执行此操作时持有 GIL(当您调用 C 时,Python 会释放 GIL)。如果是这样,您的程序将立即死锁。即使没有,finally: 块也不会被执行等等,所以这是一个非常不安全的想法。
s
serg06

这是另一种方法,但代码极其简洁,可在 2021 年的 Python 3.7 中运行:

import ctypes 

def kill_thread(thread):
    """
    thread: a threading.Thread object
    """
    thread_id = thread.ident
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(SystemExit))
    if res > 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
        print('Exception raise failure')

改编自此处:https://www.geeksforgeeks.org/python-different-ways-to-kill-a-thread/


如果您使用的是 Python 3.7,则您必须是 2018 年的时间旅行者。如果您指的是 2021 年,请提供使用 Python 3.9 的测试。 PyThreadState_SetAsyncExc 方法只是为线程退出执行“调度”。它确实不会杀死一个线程,尤其是当它正在执行外部 C 库时。尝试用您的方法杀死 sleep(100)。它将在 100 秒后被“杀死”。它与 while flag: -> 一样有效flag = False 方法。
C
Community

我想补充的一件事是,如果您阅读 threading lib Python 中的官方文档,建议避免使用“恶魔”线程,当您不希望线程突然结束时,使用 Paolo Rovelli mentioned 的标志。

来自官方文档:

守护程序线程在关闭时突然停止。它们的资源(如打开的文件、数据库事务等)可能无法正常释放。如果您希望线程优雅地停止,请将它们设为非守护进程并使用合适的信号机制,例如事件。

我认为创建守护线程取决于您的应用程序,但总的来说(在我看来)最好避免杀死它们或使它们成为守护线程。在多处理中,您可以使用 is_alive() 检查进程状态并“终止”以完成它们(也可以避免 GIL 问题)。但有时,当您在 Windows 中执行代码时,您会发现更多问题。

并且永远记住,如果你有“活动线程”,Python 解释器将运行等待它们。 (因为如果无关紧要突然结束,这个守护进程可以帮助你)。


@Tshepang 这意味着如果您的应用程序中有任何正在运行的非守护线程,Python 解释器将继续运行,直到所有非守护线程都完成。如果您不在乎线程是否在程序终止时结束,那么将它们设置为守护程序可能会很有用。
J
Jason R. Coombs

为此目的构建了一个库,stopit。尽管此处列出的一些相同注意事项仍然适用,但至少该库提供了一种常规的、可重复的技术来实现所述目标。


r
rundekugel

假设您希望拥有多个线程的相同功能,这是恕我直言,通过 id 停止一个最简单的实现:

import time
from threading import Thread

def doit(id=0):
    doit.stop=0
    print("start id:%d"%id)
    while 1:
        time.sleep(1)
        print(".")
        if doit.stop==id:
            doit.stop=0
            break
    print("end thread %d"%id)

t5=Thread(target=doit, args=(5,))
t6=Thread(target=doit, args=(6,))

t5.start() ; t6.start()
time.sleep(2)
doit.stop =5  #kill t5
time.sleep(2)
doit.stop =6  #kill t6

好东西就在这里,您可以拥有多个相同和不同的功能,并通过 functionname.stop 停止它们

如果你只想拥有一个函数线程,那么你不需要记住 id。停止,如果 doit.stop > 0。


没有任何类的纯函数线程
O
Ouroborus

只是为了建立@SCB 的想法(这正是我所需要的)来创建一个具有自定义功能的 KillableThread 子类:

from threading import Thread, Event

class KillableThread(Thread):
    def __init__(self, sleep_interval=1, target=None, name=None, args=(), kwargs={}):
        super().__init__(None, target, name, args, kwargs)
        self._kill = Event()
        self._interval = sleep_interval
        print(self._target)

    def run(self):
        while True:
            # Call custom function with arguments
            self._target(*self._args)

            # If no kill signal is set, sleep for the interval,
            # If kill signal comes in while sleeping, immediately
            #  wake up and handle
            is_killed = self._kill.wait(self._interval)
            if is_killed:
                break

        print("Killing Thread")

    def kill(self):
        self._kill.set()

if __name__ == '__main__':

    def print_msg(msg):
        print(msg)

    t = KillableThread(10, print_msg, args=("hello world"))
    t.start()
    time.sleep(6)
    print("About to kill thread")
    t.kill()

自然,与@SBC 一样,线程不会等待运行一个新循环来停止。在这个例子中,你会看到在“About to kill thread”之后打印出“Killing Thread”消息,而不是再等待 4 秒让线程完成(因为我们已经睡了 6 秒)。

KillableThread 构造函数中的第二个参数是您的自定义函数(此处为 print_msg)。 Args 参数是在此处调用函数 (("hello world")) 时将使用的参数。


w
wp78de

虽然它相当旧,但 this 对于某些人来说可能是一个方便的解决方案:

一个扩展线程模块功能的小模块——允许一个线程在另一个线程的上下文中引发异常。通过提高 SystemExit,您最终可以杀死 python 线程。

import threading
import ctypes     

def _async_raise(tid, excobj):
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        # """if it returns a number greater than one, you're in trouble, 
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
        raise SystemError("PyThreadState_SetAsyncExc failed")

class Thread(threading.Thread):
    def raise_exc(self, excobj):
        assert self.isAlive(), "thread must be started"
        for tid, tobj in threading._active.items():
            if tobj is self:
                _async_raise(tid, excobj)
                return

        # the thread was alive when we entered the loop, but was not found 
        # in the dict, hence it must have been already terminated. should we raise
        # an exception here? silently ignore?

    def terminate(self):
        # must raise the SystemExit type, instead of a SystemExit() instance
        # due to a bug in PyThreadState_SetAsyncExc
        self.raise_exc(SystemExit)

因此,它允许“线程在另一个线程的上下文中引发异常”,并且通过这种方式,终止的线程可以处理终止,而无需定期检查中止标志。

但是,根据其original source,此代码存在一些问题。

只有在执行 python 字节码时才会引发异常。如果您的线程调用本机/内置阻塞函数,则仅当执行返回到 python 代码时才会引发异常。如果内置函数在内部调用 PyErr_Clear() 也会出现问题,这将有效地取消您的未决异常。您可以尝试再次提高它。只有异常类型可以安全地引发。异常实例很可能导致意外行为,因此受到限制。例如:t1.raise_exc(TypeError) 而不是 t1.raise_exc(TypeError("blah"))。恕我直言,这是一个错误,我将其报告为一个错误。有关更多信息,http://mail.python.org/pipermail/python-dev/2006-August/068158.html 我要求在内置线程模块中公开此功能,但由于 ctypes 已成为标准库(从 2.5 开始),并且此功能不太可能与实现无关,它可能会保持未公开。


B
Basj

正如@Kozyarchuk 的answer 中所述,安装跟踪是有效的。由于此答案不包含任何代码,因此这是一个现成可用的示例:

import sys, threading, time 

class TraceThread(threading.Thread): 
    def __init__(self, *args, **keywords): 
        threading.Thread.__init__(self, *args, **keywords) 
        self.killed = False
    def start(self): 
        self._run = self.run 
        self.run = self.settrace_and_run
        threading.Thread.start(self) 
    def settrace_and_run(self): 
        sys.settrace(self.globaltrace) 
        self._run()
    def globaltrace(self, frame, event, arg): 
        return self.localtrace if event == 'call' else None
    def localtrace(self, frame, event, arg): 
        if self.killed and event == 'line': 
            raise SystemExit() 
        return self.localtrace 

def f(): 
    while True: 
        print('1') 
        time.sleep(2)
        print('2') 
        time.sleep(2)
        print('3') 
        time.sleep(2)

t = TraceThread(target=f) 
t.start() 
time.sleep(2.5) 
t.killed = True

它在打印 12 后停止。 3 未打印。


林奕忠

Python版本:3.8

使用守护线程来执行我们想要的,如果我们想终止守护线程,我们只需要让父线程退出,然后系统将终止父线程创建的守护线程。

还支持协程和协程功能。

def main():
    start_time = time.perf_counter()
    t1 = ExitThread(time.sleep, (10,), debug=False)
    t1.start()
    time.sleep(0.5)
    t1.exit()
    try:
        print(t1.result_future.result())
    except concurrent.futures.CancelledError:
        pass
    end_time = time.perf_counter()
    print(f"time cost {end_time - start_time:0.2f}")

下面是 ExitThread 源代码

import concurrent.futures
import threading
import typing
import asyncio


class _WorkItem(object):
    """ concurrent\futures\thread.py

    """

    def __init__(self, future, fn, args, kwargs, *, debug=None):
        self._debug = debug
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        if self._debug:
            print("ExitThread._WorkItem run")
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            coroutine = None
            if asyncio.iscoroutinefunction(self.fn):
                coroutine = self.fn(*self.args, **self.kwargs)
            elif asyncio.iscoroutine(self.fn):
                coroutine = self.fn
            if coroutine is None:
                result = self.fn(*self.args, **self.kwargs)
            else:
                result = asyncio.run(coroutine)
            if self._debug:
                print("_WorkItem done")
        except BaseException as exc:
            self.future.set_exception(exc)
            # Break a reference cycle with the exception 'exc'
            self = None
        else:
            self.future.set_result(result)


class ExitThread:
    """ Like a stoppable thread

    Using coroutine for target then exit before running may cause RuntimeWarning.

    """

    def __init__(self, target: typing.Union[typing.Coroutine, typing.Callable] = None
                 , args=(), kwargs={}, *, daemon=None, debug=None):
        #
        self._debug = debug
        self._parent_thread = threading.Thread(target=self._parent_thread_run, name="ExitThread_parent_thread"
                                               , daemon=daemon)
        self._child_daemon_thread = None
        self.result_future = concurrent.futures.Future()
        self._workItem = _WorkItem(self.result_future, target, args, kwargs, debug=debug)
        self._parent_thread_exit_lock = threading.Lock()
        self._parent_thread_exit_lock.acquire()
        self._parent_thread_exit_lock_released = False  # When done it will be True
        self._started = False
        self._exited = False
        self.result_future.add_done_callback(self._release_parent_thread_exit_lock)

    def _parent_thread_run(self):
        self._child_daemon_thread = threading.Thread(target=self._child_daemon_thread_run
                                                     , name="ExitThread_child_daemon_thread"
                                                     , daemon=True)
        self._child_daemon_thread.start()
        # Block manager thread
        self._parent_thread_exit_lock.acquire()
        self._parent_thread_exit_lock.release()
        if self._debug:
            print("ExitThread._parent_thread_run exit")

    def _release_parent_thread_exit_lock(self, _future):
        if self._debug:
            print(f"ExitThread._release_parent_thread_exit_lock {self._parent_thread_exit_lock_released} {_future}")
        if not self._parent_thread_exit_lock_released:
            self._parent_thread_exit_lock_released = True
            self._parent_thread_exit_lock.release()

    def _child_daemon_thread_run(self):
        self._workItem.run()

    def start(self):
        if self._debug:
            print(f"ExitThread.start {self._started}")
        if not self._started:
            self._started = True
            self._parent_thread.start()

    def exit(self):
        if self._debug:
            print(f"ExitThread.exit exited: {self._exited} lock_released: {self._parent_thread_exit_lock_released}")
        if self._parent_thread_exit_lock_released:
            return
        if not self._exited:
            self._exited = True
            if not self.result_future.cancel():
                if self.result_future.running():
                    self.result_future.set_exception(concurrent.futures.CancelledError())

z
zzart

这似乎适用于 Windows 7 上的 pywin32

my_thread = threading.Thread()
my_thread.start()
my_thread._Thread__stop()

k
kernstock

Pieter Hintjens - ØMQ-项目的创始人之一 - 说,使用 ØMQ 并避免同步原语,如锁、互斥体、事件等,是编写多线程程序的最明智和最安全的方法:

http://zguide.zeromq.org/py:all#Multithreading-with-ZeroMQ

这包括告诉子线程,它应该取消它的工作。这将通过为线程配备一个ØMQ-socket并在该socket上轮询一条消息来说明它应该取消来完成。

该链接还提供了一个使用 ØMQ 的多线程 python 代码示例。


r
reubano

另一种方法是使用 signal.pthread_kill 发送停止信号。

from signal import pthread_kill, SIGTSTP
from threading import Thread
from itertools import count
from time import sleep

def target():
    for num in count():
        print(num)
        sleep(1)

thread = Thread(target=target)
thread.start()
sleep(5)
pthread_kill(thread.ident, SIGTSTP)

结果

0
1
2
3
4

[14]+  Stopped

M
Matthias Urlichs

如果您确实需要终止子任务的能力,请使用替代实现。 multiprocessinggevent 都支持不分青红皂白地杀死“线程”。

Python 的线程不支持取消。想都别想。您的代码很可能会死锁、损坏或泄漏内存,或者具有其他意想不到的“有趣”难以调试的影响,这些影响很少发生且不确定。


......是的,我知道两者都不是严格的“线程”,但如果您的代码适合(或可以适合)他们的模型,它们都可以工作。
u
user1942887

您可以在进程中执行命令,然后使用进程 ID 将其终止。我需要在两个线程之间同步,其中一个线程不会自行返回。

processIds = []

def executeRecord(command):
    print(command)

    process = subprocess.Popen(command, stdout=subprocess.PIPE)
    processIds.append(process.pid)
    print(processIds[0])

    #Command that doesn't return by itself
    process.stdout.read().decode("utf-8")
    return;


def recordThread(command, timeOut):

    thread = Thread(target=executeRecord, args=(command,))
    thread.start()
    thread.join(timeOut)

    os.kill(processIds.pop(), signal.SIGINT)

    return;

D
David Lador

最简单的方法是这样的:

from threading import Thread
from time import sleep

def do_something():
    global thread_work
    while thread_work:
        print('doing something')
        sleep(5)
    print('Thread stopped')

thread_work = True
Thread(target=do_something).start()
sleep(5)
thread_work = False

它没有停止线程,而是通过它
R
Russia Must Remove Putin

这是一个不好的答案,请参阅评论

这是如何做到的:

from threading import *

...

for thread in enumerate():
    if thread.isAlive():
        try:
            thread._Thread__stop()
        except:
            print(str(thread.getName()) + ' could not be terminated'))

给它几秒钟然后你的线程应该停止。还要检查 thread._Thread__delete() 方法。

为方便起见,我建议使用 thread.quit() 方法。例如,如果您的线程中有一个套接字,我建议在您的套接字句柄类中创建一个 quit() 方法,终止该套接字,然后在您的 quit() 中运行一个 thread._Thread__stop()


有关“这并不能真正停止线程”的更多详细信息会有所帮助。
基本上,调用 _Thread__stop 方法除了告诉 Python 线程已停止之外没有任何作用。它实际上可以继续运行。有关示例,请参见 gist.github.com/2787191
这是完全错误的。 _Thread__stop() 只是将线程标记为已停止,它实际上并没有停止线程!永远不要这样做。 Have a read