Odpowiedzi:
Możesz użyć pakietu sygnału , jeśli pracujesz w systemie UNIX:
In [1]: import signal
# Register an handler for the timeout
In [2]: def handler(signum, frame):
...: print("Forever is over!")
...: raise Exception("end of time")
...:
# This function *may* run for an indetermined time...
In [3]: def loop_forever():
...: import time
...: while 1:
...: print("sec")
...: time.sleep(1)
...:
...:
# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0
# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0
In [6]: try:
...: loop_forever()
...: except Exception, exc:
...: print(exc)
....:
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time
# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0
10 sekund po wywołaniu alarm.alarm(10)
zostaje wywołany przewodnik. Rodzi to wyjątek, który można przechwycić ze zwykłego kodu Pythona.
Ten moduł nie działa dobrze z wątkami (ale w takim razie, kto?)
Pamiętaj, że ponieważ zgłaszamy wyjątek, gdy nastąpi przekroczenie limitu czasu, może on zostać złapany i zignorowany wewnątrz funkcji, na przykład jednej z takich funkcji:
def loop_forever():
while 1:
print('sec')
try:
time.sleep(10)
except:
continue
signal.alarm
i powiązane SIGALRM
nie są dostępne na platformach Windows.
signal.signal
--- czy wszystkie działają poprawnie? Czy każde signal.signal
połączenie nie anuluje „jednoczesnego” jednego?
Możesz użyć multiprocessing.Process
do zrobienia dokładnie tego.
Kod
import multiprocessing
import time
# bar
def bar():
for i in range(100):
print "Tick"
time.sleep(1)
if __name__ == '__main__':
# Start bar as a process
p = multiprocessing.Process(target=bar)
p.start()
# Wait for 10 seconds or until process finishes
p.join(10)
# If thread is still active
if p.is_alive():
print "running... let's kill it..."
# Terminate
p.terminate()
p.join()
join()
. to sprawia, że twoja liczba współbieżnych podprocesów jest uruchomiona, dopóki nie zakończą pracy, lub ilość zdefiniowana w join(10)
. Jeśli masz blokujące operacje we / wy dla 10 procesów, używając join (10) ustawiłeś je tak, aby czekały na maksimum 10 dla KAŻDEGO rozpoczętego procesu. Użyj flagi demona, jak w tym przykładzie stackoverflow.com/a/27420072/2480481 . Oczywiście możesz przekazać flagę daemon=True
bezpośrednio do multiprocessing.Process()
funkcji.
terminate() ... Note that exit handlers and finally clauses, etc., will not be executed. Note that descendant processes of the process will not be terminated – they will simply become orphaned.
Jak wywołać funkcję lub w co ją owinąć, aby skoro anulowanie trwa dłużej niż 5 sekund, skrypt ją anuluje?
Zamieściłem istotę , która rozwiązuje to pytanie / problem z dekorator a threading.Timer
. Oto podział.
Został przetestowany w Pythonie 2 i 3. Powinien także działać w systemach Unix / Linux i Windows.
Najpierw import. Starają się zachować spójność kodu niezależnie od wersji Pythona:
from __future__ import print_function
import sys
import threading
from time import sleep
try:
import thread
except ImportError:
import _thread as thread
Użyj kodu niezależnego od wersji:
try:
range, _print = xrange, print
def print(*args, **kwargs):
flush = kwargs.pop('flush', False)
_print(*args, **kwargs)
if flush:
kwargs.get('file', sys.stdout).flush()
except NameError:
pass
Teraz zaimportowaliśmy naszą funkcjonalność ze standardowej biblioteki.
exit_after
dekoratorNastępnie potrzebujemy funkcji, aby zakończyć main()
wątek potomny:
def quit_function(fn_name):
# print to stderr, unbuffered in Python 2.
print('{0} took too long'.format(fn_name), file=sys.stderr)
sys.stderr.flush() # Python 3 stderr is likely buffered.
thread.interrupt_main() # raises KeyboardInterrupt
A oto sam dekorator:
def exit_after(s):
'''
use as decorator to exit process if
function takes longer than s seconds
'''
def outer(fn):
def inner(*args, **kwargs):
timer = threading.Timer(s, quit_function, args=[fn.__name__])
timer.start()
try:
result = fn(*args, **kwargs)
finally:
timer.cancel()
return result
return inner
return outer
A oto użycie, które bezpośrednio odpowiada na pytanie o wyjściu po 5 sekundach:
@exit_after(5)
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
sleep(1)
print('countdown finished')
Próbny:
>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 6, in countdown
KeyboardInterrupt
Drugie wywołanie funkcji nie zakończy się, zamiast tego proces powinien zostać zakończony za pomocą śledzenia zwrotnego!
KeyboardInterrupt
nie zawsze zatrzymuje śpiący wątekPamiętaj, że uśpienie nie zawsze będzie przerywane przerwaniem klawiatury, w Pythonie 2 w systemie Windows, np .:
@exit_after(1)
def sleep10():
sleep(10)
print('slept 10 seconds')
>>> sleep10()
sleep10 took too long # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 3, in sleep10
KeyboardInterrupt
nie jest też w stanie przerwać działania kodu w rozszerzeniach, chyba że wyraźnie to sprawdzi PyErr_CheckSignals()
, zobacz Cython, Python i KeyboardInterrupt ignorowane
W każdym razie unikałbym spania wątku dłużej niż sekundę - to eon czasu procesora.
Jak wywołać funkcję lub w co ją owinąć, aby skoro trwa dłużej niż 5 sekund, skrypt ją anuluje i zrobi coś innego?
Aby go złapać i zrobić coś innego, możesz złapać KeyboardInterrupt.
>>> try:
... countdown(10)
... except KeyboardInterrupt:
... print('do something else')
...
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
thread.interrupt_main()
, dlaczego nie mogę bezpośrednio zgłosić wyjątku?
multiprocessing.connection.Client
tego? - Próbuję rozwiązać: stackoverflow.com/questions/57817955/…
Mam inną propozycję, która jest czystą funkcją (z tym samym interfejsem API co sugestia wątków) i wydaje się działać dobrze (w oparciu o sugestie dotyczące tego wątku)
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
import signal
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_duration)
try:
result = func(*args, **kwargs)
except TimeoutError as exc:
result = default
finally:
signal.alarm(0)
return result
timeout
. O wiele lepiej jest ustawić wartość domyślną None
i dodać w pierwszym wierszu funkcji kwargs = kwargs or {}
. Args jest w porządku, ponieważ krotek nie można modyfikować.
Natknąłem się na ten wątek, szukając limitu czasu w testach jednostkowych. Nie znalazłem nic prostego w odpowiedziach ani paczkach stron trzecich, więc napisałem poniżej dekorator, który możesz wrzucić do kodu:
import multiprocessing.pool
import functools
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
To jest tak proste, jak przekroczenie limitu czasu testu lub dowolnej funkcji, którą lubisz:
@timeout(5.0) # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
...
Exception
wewnątrz func_wrapper i zrobić pool.close()
po złapaniu, aby upewnić się, że wątek zawsze umiera później, bez względu na wszystko. Następnie możesz rzucić TimeoutError
lub cokolwiek chcesz. Wydaje się dla mnie pracować.
RuntimeError: can't start new thread
. Czy nadal będzie działać, jeśli go zignoruję, czy jest coś innego, co mogę zrobić, aby to obejść? Z góry dziękuję!
stopit
Pakiet, znalezionych na PyPI, wydaje się dobrze obsłużyć limity czasu.
Podoba mi się @stopit.threading_timeoutable
dekorator, który dodaje timeout
parametr do dekorowanej funkcji, która robi to, czego oczekujesz, zatrzymuje funkcję.
Sprawdź to na pypi: https://pypi.python.org/pypi/stopit
Istnieje wiele sugestii, ale żadna z nich nie używa współbieżnych. Przyszłości, co moim zdaniem jest najbardziej czytelnym sposobem na poradzenie sobie z tym.
from concurrent.futures import ProcessPoolExecutor
# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
with ProcessPoolExecutor() as p:
f = p.submit(fnc, *args, **kwargs)
return f.result(timeout=5)
Super prosty do odczytania i utrzymania.
Tworzymy pulę, przesyłamy pojedynczy proces, a następnie czekamy do 5 sekund przed podniesieniem błędu limitu czasu, który można złapać i obsłużyć w razie potrzeby.
Natywny dla Pythona 3.2+ i backportowany do 2.7 (futures instalujacy pip).
Przełączanie pomiędzy gwintem i sposobów jest to tak proste jak wymiana ProcessPoolExecutor
z ThreadPoolExecutor
.
Jeśli chcesz zakończyć Proces po upływie limitu czasu, sugeruję zajrzenie do Pebble .
Świetny, łatwy w użyciu i niezawodny dekorator przekroczenia limitu czasu projektu PyPi ( https://pypi.org/project/timeout-decorator/ )
instalacja :
pip install timeout-decorator
Zastosowanie :
import time
import timeout_decorator
@timeout_decorator.timeout(5)
def mytest():
print "Start"
for i in range(1,10):
time.sleep(1)
print "%d seconds have passed" % i
if __name__ == '__main__':
mytest()
Jestem autorem wrapt_timeout_decorator
Większość przedstawionych tutaj rozwiązań działa na pierwszy rzut oka pod Linuksem - ponieważ mamy fork () i sygnały () - ale w systemie Windows wygląda to nieco inaczej. A jeśli chodzi o wątki w Linuksie, nie możesz już używać Sygnałów.
Aby odrodzić proces w systemie Windows, musi być możliwy do odebrania - a wiele udekorowanych funkcji lub metod klasowych nie.
Musisz więc użyć lepszego piklera, takiego jak koperek i proces wieloprocesowy (nie marynowany i wieloprocesowy) - dlatego nie możesz używać ProcessPoolExecutor (lub tylko z ograniczoną funkcjonalnością).
Dla samego limitu czasu - Musisz zdefiniować, co oznacza limit czasu - ponieważ w systemie Windows zajmie to sporo (i nie można go określić) czasu na odrodzenie procesu. Może to być trudne w przypadku krótkich limitów czasu. Załóżmy, że odrodzenie procesu zajmuje około 0,5 sekundy (łatwo !!!). Jeśli limit czasu wynosi 0,2 sekundy, co powinno się stać? Czy funkcja powinna upłynąć po 0,5 + 0,2 sekundy (więc niech metoda będzie działać przez 0,2 sekundy)? A może wywoływany proces wygaśnie po 0,2 sekundy (w takim przypadku funkcja dekoracyjna ZAWSZE przekroczy limit czasu, ponieważ w tym czasie nie jest nawet odradzana)?
Również zagnieżdżone dekoratory mogą być nieprzyjemne i nie można używać sygnałów w podtytule. Jeśli chcesz stworzyć prawdziwie uniwersalny, wieloplatformowy dekorator, wszystko to należy wziąć pod uwagę (i przetestować).
Inne problemy to przekazywanie wyjątków z powrotem do programu wywołującego, a także problemy z logowaniem (jeśli są używane w funkcji dekorowanej - logowanie do plików w innym procesie NIE jest obsługiwane)
Próbowałem objąć wszystkie przypadki brzegowe. Możesz zajrzeć do pakietu wrapt_timeout_decorator, lub przynajmniej przetestować własne rozwiązania inspirowane najbardziej używanymi tam jednostkami.
@Alexis Eggermont - niestety nie mam wystarczającej liczby punktów do skomentowania - może ktoś inny może Cię powiadomić - myślę, że rozwiązałem problem z importem.
timeout-decorator
nie działają w systemie Windows, ponieważ Windows nie obsługiwał signal
dobrze.
Jeśli użyjesz timeout-decorator w systemie Windows, otrzymasz następujące
AttributeError: module 'signal' has no attribute 'SIGALRM'
Niektórzy sugerowali użycie, use_signals=False
ale nie działali dla mnie.
Autor @bitranox utworzył następujący pakiet:
pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip
Przykładowy kod:
import time
from wrapt_timeout_decorator import *
@timeout(5)
def mytest(message):
print(message)
for i in range(1,10):
time.sleep(1)
print('{} seconds have passed'.format(i))
def main():
mytest('starting')
if __name__ == '__main__':
main()
Daje następujący wyjątek:
TimeoutError: Function mytest timed out after 5 seconds
from wrapt_timeout_decorator import *
wydaje się zabijać część moich innych importów. Na przykład dostaję ModuleNotFoundError: No module named 'google.appengine'
, ale nie dostaję tego błędu, jeśli nie zaimportuję wrapt_timeout_decorator
Możemy użyć sygnałów do tego samego. Myślę, że poniższy przykład będzie dla ciebie przydatny. Jest bardzo prosty w porównaniu do wątków.
import signal
def timeout(signum, frame):
raise myException
#this is an infinite loop, never ending under normal circumstances
def main():
print 'Starting Main ',
while 1:
print 'in main ',
#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)
#change 5 to however many seconds you need
signal.alarm(5)
try:
main()
except myException:
print "whoops"
try: ... except: ...
są zawsze złym pomysłem.
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
Miałem potrzebę gniazdowa przerwania czasowe (który SIGALARM nie można zrobić), które nie będą blokowane przez time.sleep (których podejście oparte wątek nie może zrobić). Skończyłem kopiowanie i lekką modyfikację kodu stąd: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
Sam kod:
#!/usr/bin/python
# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
"""alarm.py: Permits multiple SIGALRM events to be queued.
Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""
import heapq
import signal
from time import time
__version__ = '$Revision: 2539 $'.split()[1]
alarmlist = []
__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))
class TimeoutError(Exception):
def __init__(self, message, id_=None):
self.message = message
self.id_ = id_
class Timeout:
''' id_ allows for nested timeouts. '''
def __init__(self, id_=None, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message
self.id_ = id_
def handle_timeout(self):
raise TimeoutError(self.error_message, self.id_)
def __enter__(self):
self.this_alarm = alarm(self.seconds, self.handle_timeout)
def __exit__(self, type, value, traceback):
try:
cancel(self.this_alarm)
except ValueError:
pass
def __clear_alarm():
"""Clear an existing alarm.
If the alarm signal was set to a callable other than our own, queue the
previous alarm settings.
"""
oldsec = signal.alarm(0)
oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
if oldsec > 0 and oldfunc != __alarm_handler:
heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))
def __alarm_handler(*zargs):
"""Handle an alarm by calling any due heap entries and resetting the alarm.
Note that multiple heap entries might get called, especially if calling an
entry takes a lot of time.
"""
try:
nextt = __next_alarm()
while nextt is not None and nextt <= 0:
(tm, func, args, keys) = heapq.heappop(alarmlist)
func(*args, **keys)
nextt = __next_alarm()
finally:
if alarmlist: __set_alarm()
def alarm(sec, func, *args, **keys):
"""Set an alarm.
When the alarm is raised in `sec` seconds, the handler will call `func`,
passing `args` and `keys`. Return the heap entry (which is just a big
tuple), so that it can be cancelled by calling `cancel()`.
"""
__clear_alarm()
try:
newalarm = __new_alarm(sec, func, args, keys)
heapq.heappush(alarmlist, newalarm)
return newalarm
finally:
__set_alarm()
def cancel(alarm):
"""Cancel an alarm by passing the heap entry returned by `alarm()`.
It is an error to try to cancel an alarm which has already occurred.
"""
__clear_alarm()
try:
alarmlist.remove(alarm)
heapq.heapify(alarmlist)
finally:
if alarmlist: __set_alarm()
i przykład użycia:
import alarm
from time import sleep
try:
with alarm.Timeout(id_='a', seconds=5):
try:
with alarm.Timeout(id_='b', seconds=2):
sleep(3)
except alarm.TimeoutError as e:
print 'raised', e.id_
sleep(30)
except alarm.TimeoutError as e:
print 'raised', e.id_
else:
print 'nope.'
Oto niewielka poprawa podanego rozwiązania opartego na wątkach.
Poniższy kod obsługuje wyjątki :
def runFunctionCatchExceptions(func, *args, **kwargs):
try:
result = func(*args, **kwargs)
except Exception, message:
return ["exception", message]
return ["RESULT", result]
def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
import threading
class InterruptableThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = default
def run(self):
self.result = runFunctionCatchExceptions(func, *args, **kwargs)
it = InterruptableThread()
it.start()
it.join(timeout_duration)
if it.isAlive():
return default
if it.result[0] == "exception":
raise it.result[1]
return it.result[1]
Wywoływanie go z 5-sekundowym limitem czasu:
result = timeout(remote_calculate, (myarg,), timeout_duration=5)
runFunctionCatchExceptions()
niektórych funkcjach Pythona wywoływane są GIL. Np dodaje będzie nigdy, albo za bardzo długi czas powrotu nazywany wewnątrz funkcji: eval(2**9999999999**9999999999)
. Zobacz stackoverflow.com/questions/22138190/…