强行停止python子线程最佳方案

发布时间:2019-09-07 08:08:10编辑:auto阅读(2622)

        子线程的强制性终止是我们实际应用时经常需要用到的,然而python官方并没有给出相关的函数来处理这种情况。网上找到一个挺合理的解决方案,这里分享给大家。

    import threading
    import time
    import inspect
    import ctypes
    
    
    def _async_raise(tid, exctype):
        """raises the exception, performs cleanup if needed"""
        tid = ctypes.c_long(tid)
        if not inspect.isclass(exctype):
            exctype = type(exctype)
        res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
        if res == 0:
            raise ValueError("invalid thread id")
        elif res != 1:
            # """if it returns a number greater than one, you're in trouble,
            # and you should call it again with exc=NULL to revert the effect"""
            ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
            raise SystemError("PyThreadState_SetAsyncExc failed")
    
    
    def stop_thread(thread):
        _async_raise(thread.ident, SystemExit)
    
    
    def test():
        while True:
            print('-------')
            time.sleep(0.5)
    
    
    if __name__ == "__main__":
        t = threading.Thread(target=test)
        t.start()
        time.sleep(5.2)
        print("main thread sleep finish")
        stop_thread(t)

关键字