方法一、使用 signal

  1. import signal
  2. import time
  3. def set_timeout(num, callback):
  4. def wrap(func):
  5. def handle(signum, frame): # 收到信号 SIGALRM 后的回调函数,第一个参数是信号的数字,第二个参数是the interrupted stack frame.
  6. raise RuntimeError
  7. def to_do(*args, **kwargs):
  8. try:
  9. signal.signal(signal.SIGALRM, handle) # 设置信号和回调函数
  10. signal.alarm(num) # 设置 num 秒的闹钟
  11. print('start alarm signal.')
  12. r = func(*args, **kwargs)
  13. print('close alarm signal.')
  14. signal.alarm(0) # 关闭闹钟
  15. return r
  16. except RuntimeError as e:
  17. callback()
  18. return to_do
  19. return wrap
  20. def after_timeout(): # 超时后的处理函数
  21. print("Time out!")
  22. @set_timeout(2, after_timeout) # 限时 2 秒超时
  23. def connect(): # 要执行的函数
  24. time.sleep(3) # 函数执行时间,写大于2的值,可测试超时
  25. print('Finished without timeout.')
  26. if __name__ == '__main__':
  27. connect()

注意:使用的signal有所限制,需要在linux系统上,并且需要在主线程中使用,否则无效

方法二. 使用Thread

  1. from time import sleep, time
  2. import sys, threading
  3. class KThread(threading.Thread):
  4. """A subclass of threading.Thread, with a kill()
  5. method.
  6. Come from:
  7. Kill a thread in Python:
  8. http://mail.python.org/pipermail/python-list/2004-May/260937.html
  9. """
  10. def __init__(self, *args, **kwargs):
  11. threading.Thread.__init__(self, *args, **kwargs)
  12. self.killed = False
  13. def start(self):
  14. """Start the thread."""
  15. self.__run_backup = self.run
  16. self.run = self.__run # Force the Thread to install our trace.
  17. threading.Thread.start(self)
  18. def __run(self):
  19. """Hacked run function, which installs the
  20. trace."""
  21. sys.settrace(self.globaltrace)
  22. self.__run_backup()
  23. self.run = self.__run_backup
  24. def globaltrace(self, frame, why, arg):
  25. if why == 'call':
  26. return self.localtrace
  27. else:
  28. return None
  29. def localtrace(self, frame, why, arg):
  30. if self.killed:
  31. if why == 'line':
  32. raise SystemExit()
  33. return self.localtrace
  34. def kill(self):
  35. self.killed = True
  36. class Timeout(Exception):
  37. """function run timeout"""
  38. def set_timeout(seconds, callback_data):
  39. """超时装饰器,指定超时时间
  40. 若被装饰的方法在指定的时间内未返回,则抛出Timeout异常"""
  41. def timeout_decorator(func):
  42. """真正的装饰器"""
  43. def _new_func(oldfunc, result, oldfunc_args, oldfunc_kwargs):
  44. result.append(oldfunc(*oldfunc_args, **oldfunc_kwargs))
  45. def _(*args, **kwargs):
  46. result = []
  47. new_kwargs = { # create new args for _new_func, because we want to get the func return val to result list
  48. 'oldfunc': func,
  49. 'result': result,
  50. 'oldfunc_args': args,
  51. 'oldfunc_kwargs': kwargs
  52. }
  53. thd = KThread(target=_new_func, args=(), kwargs=new_kwargs)
  54. thd.start()
  55. thd.join(seconds)
  56. alive = thd.isAlive()
  57. thd.kill() # kill the child thread
  58. if alive:
  59. # raise Timeout(u'function run too long, timeout %d seconds.' % seconds)
  60. try:
  61. return callback_data
  62. # raise Timeout(u'function run too long, timeout %d seconds.' % seconds)
  63. finally:
  64. # return u'function run too long, timeout %d seconds.' % seconds
  65. return callback_data
  66. else:
  67. return result[0]
  68. _.__name__ = func.__name__
  69. _.__doc__ = func.__doc__
  70. return _
  71. return timeout_decorator
  72. @set_timeout(2, None) # 限时 2 秒超时
  73. def connect(): # 要执行的函数
  74. time.sleep(3) # 函数执行时间,写大于2的值,可测试超时
  75. print('Finished without timeout.')
  76. if __name__ == '__main__':
  77. connect()

方法二使用线程的版本可以支持多线程