Mówienie o async/await
i asyncio
to nie to samo. Pierwsza to fundamentalna konstrukcja niskiego poziomu (coroutines), a druga to biblioteka korzystająca z tych konstrukcji. I odwrotnie, nie ma jednej ostatecznej odpowiedzi.
Poniżej znajduje się ogólny opis działania bibliotek async/await
i asyncio
podobnych. Oznacza to, że na górze mogą być inne sztuczki (są ...), ale nie mają one znaczenia, chyba że sam je zbudujesz. Różnica powinna być znikoma, chyba że wiesz już wystarczająco dużo, aby nie musieć zadawać takiego pytania.
1. Korekty a podprogramy w łupinie orzecha
Podobnie jak podprogramy (funkcje, procedury, ...), procedury (generatory, ...) są abstrakcją stosu wywołań i wskaźnika instrukcji: istnieje stos wykonujących się fragmentów kodu, a każdy znajduje się przy określonej instrukcji.
Rozróżnienie między def
wersjami async def
służy jedynie przejrzystości. Rzeczywista różnica jest w return
porównaniu yield
. Z tego await
lub yield from
weź różnicę z pojedynczych wywołań do całych stacków.
1.1. Podprogramy
Podprocedura reprezentuje nowy poziom stosu, na którym będą przechowywane zmienne lokalne, oraz pojedyncze przejście jego instrukcji, aby osiągnąć koniec. Rozważ podprogram taki jak ten:
def subfoo(bar):
qux = 3
return qux * bar
To znaczy, kiedy go uruchamiasz
- przydziel miejsce na stosie dla
bar
iqux
- rekurencyjnie wykonuje pierwszą instrukcję i przeskakuje do następnej
- raz na raz
return
, umieść jego wartość na stosie wywołań
- wyczyść stos (1.) i wskaźnik instrukcji (2.)
Warto zauważyć, że 4. oznacza, że podprogram zawsze zaczyna się w tym samym stanie. Wszystko, co dotyczy samej funkcji, zostaje utracone po zakończeniu. Funkcji nie można wznowić, nawet jeśli są po niej instrukcje return
.
root -\
: \- subfoo --\
:/--<---return --/
|
V
1.2. Korekty jako trwałe podprogramy
Program jest podobny do podprogramu, ale może wyjść bez niszczenia jego stanu. Rozważmy taki program:
def cofoo(bar):
qux = yield bar # yield marks a break point
return qux
To znaczy, kiedy go uruchamiasz
- przydziel miejsce na stosie dla
bar
iqux
- rekurencyjnie wykonuje pierwszą instrukcję i przeskakuje do następnej
- raz na
yield
, umieść jego wartość na stosie wywołań, ale zapisz stos i wskaźnik instrukcji
- po wywołaniu do
yield
, przywróć stos i wskaźnik instrukcji i wypchnij argumenty doqux
- raz na raz
return
, umieść jego wartość na stosie wywołań
- wyczyść stos (1.) i wskaźnik instrukcji (2.)
Zwróć uwagę na dodanie 2.1 i 2.2 - program można zawiesić i wznowić we wcześniej określonych punktach. Jest to podobne do zawieszenia podprogramu podczas wywoływania innego podprogramu. Różnica polega na tym, że aktywny coroutine nie jest ściśle powiązany ze swoim stosem wywołań. Zamiast tego zawieszony program jest częścią oddzielnego, izolowanego stosu.
root -\
: \- cofoo --\
:/--<+--yield --/
| :
V :
Oznacza to, że zawieszone programy można dowolnie przechowywać lub przenosić między stosami. Każdy stos wywołań, który ma dostęp do programu, może zdecydować o jego wznowieniu.
1.3. Przechodzenie przez stos wywołań
Jak dotąd, nasz coroutine idzie w dół stosu wywołań tylko z yield
. Podprogram może zejść w dół i w górę stosu wywołań za pomocą return
i ()
. W celu zapewnienia kompletności, procedury potrzebują również mechanizmu, aby przejść w górę stosu wywołań. Rozważmy taki program:
def wrap():
yield 'before'
yield from cofoo()
yield 'after'
Po uruchomieniu oznacza to, że nadal alokuje stos i wskaźnik instrukcji jak podprogram. Kiedy się zawiesza, nadal przypomina to zapisywanie podprogramu.
Jednak yield from
robi jedno i drugie . Zawiesza stos i wskaźnik instrukcji wrap
i działa cofoo
. Zauważ, że wrap
pozostaje zawieszony do cofoo
całkowitego zakończenia. Zawsze, gdy cofoo
zawiesza się lub coś jest wysyłane, cofoo
jest bezpośrednio podłączane do stosu wywołań.
1.4. Korekty w dół
Jak ustalono, yield from
umożliwia połączenie dwóch zakresów w innym pośrednim. W przypadku zastosowania rekurencyjnego oznacza to, że góra stosu może być połączona z dnem stosu.
root -\
: \-> coro_a -yield-from-> coro_b --\
:/ <-+------------------------yield ---/
| :
:\ --+-- coro_a.send----------yield ---\
: coro_b <-/
Zwróć na to uwagę root
i coro_b
nie wiedzcie o sobie. To sprawia, że programy są znacznie czystsze niż wywołania zwrotne: programy nadal są zbudowane na relacji 1: 1, podobnie jak podprogramy. Koordynatorzy zawieszają i wznawiają cały istniejący stos wykonania aż do zwykłego punktu wywołania.
W szczególności root
może mieć dowolną liczbę programów do wznowienia. Jednak nigdy nie może wznowić więcej niż jednego w tym samym czasie. Korekty tego samego rdzenia są współbieżne, ale nie równoległe!
1.5. Pythona async
iawait
Wyjaśnieniem tej pory jednoznacznie używany yield
i yield from
słownictwo generatorów - funkcjonalność bazowym jest taka sama. Nowa składnia Pythona 3.5 async
i await
istnieje głównie dla przejrzystości.
def foo(): # subroutine?
return None
def foo(): # coroutine?
yield from foofoo() # generator? coroutine?
async def foo(): # coroutine!
await foofoo() # coroutine!
return None
Instrukcje async for
i async with
są potrzebne, ponieważ przerwałbyś yield from/await
łańcuch instrukcjami gołymi for
i with
.
2. Anatomia prostej pętli zdarzeń
Sam w sobie program nie ma koncepcji poddania kontroli innemu programowi. Może przekazać kontrolę tylko wywołującemu na dole stosu coroutine. Ten wywołujący może następnie przełączyć się na inny program i uruchomić go.
Ten węzeł główny kilku programów jest zwykle pętlą zdarzeń : w przypadku zawieszenia, program generuje zdarzenie, od którego chce wznowić. Z kolei pętla zdarzeń jest w stanie skutecznie czekać na wystąpienie tych zdarzeń. Dzięki temu może zdecydować, który program ma zostać uruchomiony jako następny lub jak czekać przed wznowieniem.
Taki projekt oznacza, że istnieje zestaw predefiniowanych zdarzeń, które rozumie pętla. Kilka programów współpracuje await
ze sobą, aż w końcu odbywa się wydarzenie await
. To zdarzenie może komunikować się bezpośrednio z pętlą zdarzeń przez yield
sterowanie.
loop -\
: \-> coroutine --await--> event --\
:/ <-+----------------------- yield --/
| :
| : # loop waits for event to happen
| :
:\ --+-- send(reply) -------- yield --\
: coroutine <--yield-- event <-/
Kluczem jest to, że standardowe zawieszenie umożliwia bezpośrednią komunikację pętli zdarzeń i zdarzeń. Pośredni stos programu coroutine nie wymaga żadnej wiedzy o tym, która pętla go uruchamia ani jak działają zdarzenia.
2.1.1. Wydarzenia w czasie
Najprostszym zdarzeniem do obsłużenia jest osiągnięcie punktu w czasie. Jest to również podstawowy blok kodu z wątkami: wątek jest powtarzany, sleep
aż warunek zostanie spełniony. Jednak zwykłe sleep
wykonywanie bloków samo w sobie - chcemy, aby inne programy nie były blokowane. Zamiast tego chcemy powiedzieć pętli zdarzeń, kiedy powinna wznowić bieżący stos programu.
2.1.2. Definiowanie wydarzenia
Zdarzenie to po prostu wartość, którą możemy zidentyfikować - czy to poprzez wyliczenie, typ czy inną tożsamość. Możemy to zdefiniować za pomocą prostej klasy, która przechowuje nasz docelowy czas. Oprócz przechowywania informacji o wydarzeniach, możemy pozwolić await
bezpośrednio na zajęcia.
class AsyncSleep:
"""Event to sleep until a point in time"""
def __init__(self, until: float):
self.until = until
# used whenever someone ``await``s an instance of this Event
def __await__(self):
# yield this Event to the loop
yield self
def __repr__(self):
return '%s(until=%.1f)' % (self.__class__.__name__, self.until)
Ta klasa tylko przechowuje zdarzenie - nie mówi, jak właściwie je obsłużyć.
Jedyną specjalną cechą jest __await__
to, czego await
szuka słowo kluczowe. W praktyce jest to iterator, ale nie jest dostępny dla zwykłych maszyn iteracyjnych.
2.2.1. Oczekiwanie na wydarzenie
Teraz, gdy mamy wydarzenie, jak reagują na to programy? Powinniśmy być w stanie wyrazić odpowiednikiem sleep
przez await
ing naszą imprezę. Aby lepiej zobaczyć, co się dzieje, przez połowę czasu czekamy dwa razy:
import time
async def asleep(duration: float):
"""await that ``duration`` seconds pass"""
await AsyncSleep(time.time() + duration / 2)
await AsyncSleep(time.time() + duration / 2)
Możemy bezpośrednio utworzyć instancję i uruchomić tę procedurę. Podobnie jak w przypadku generatora, użycie coroutine.send
powoduje uruchomienie programu aż do uzyskania yield
wyniku.
coroutine = asleep(100)
while True:
print(coroutine.send(None))
time.sleep(0.1)
To daje nam dwa AsyncSleep
zdarzenia, a następnie StopIteration
moment zakończenia programu. Zwróć uwagę, że jedyne opóźnienie pochodzi z time.sleep
pętli! Każdy AsyncSleep
zapisuje tylko przesunięcie od bieżącego czasu.
2.2.2. Wydarzenie + uśpienie
W tym momencie mamy do dyspozycji dwa odrębne mechanizmy:
AsyncSleep
Zdarzenia, które można wywołać z wnętrza programu
time.sleep
które mogą czekać bez wpływu na programy
Warto zauważyć, że te dwa są ortogonalne: żaden z nich nie wpływa ani nie uruchamia drugiego. W rezultacie możemy opracować własną strategię, sleep
aby sprostać opóźnieniu związanemu z plikiem AsyncSleep
.
2.3. Naiwna pętla wydarzeń
Jeśli mamy kilka programów, każdy może nam powiedzieć, kiedy chce się obudzić. Możemy wtedy zaczekać, aż pierwszy z nich będzie chciał wznowić, potem następny i tak dalej. Warto zauważyć, że w każdym punkcie zależy nam tylko na tym, który z nich będzie następny .
To sprawia, że planowanie jest proste:
- posortuj programy według pożądanego czasu przebudzenia
- wybierz pierwszą, która chce się obudzić
- poczekaj do tego momentu
- uruchom ten program
- powtórz od 1.
Banalna implementacja nie wymaga żadnych zaawansowanych koncepcji. A list
umożliwia sortowanie programów według daty. Czekanie jest normalne time.sleep
. Uruchamianie programów działa tak samo jak wcześniej z coroutine.send
.
def run(*coroutines):
"""Cooperatively run all ``coroutines`` until completion"""
# store wake-up-time and coroutines
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting:
# 2. pick the first coroutine that wants to wake up
until, coroutine = waiting.pop(0)
# 3. wait until this point in time
time.sleep(max(0.0, until - time.time()))
# 4. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])
Oczywiście jest to dużo miejsca na ulepszenia. Możemy użyć sterty dla kolejki oczekiwania lub tabeli wysyłkowej dla zdarzeń. Moglibyśmy również pobrać wartości zwracane z programu StopIteration
i przypisać je do programu. Jednak podstawowa zasada pozostaje ta sama.
2.4. Czekanie w spółdzielni
AsyncSleep
Wydarzenie i run
pętla zdarzenie to wdrożenie w pełni robocze czasowe zdarzeń.
async def sleepy(identifier: str = "coroutine", count=5):
for i in range(count):
print(identifier, 'step', i + 1, 'at %.2f' % time.time())
await asleep(0.1)
run(*(sleepy("coroutine %d" % j) for j in range(5)))
To wspólnie przełącza się między każdym z pięciu programów, zawieszając każdy na 0,1 sekundy. Mimo że pętla zdarzeń jest synchroniczna, nadal wykonuje pracę w 0,5 sekundy zamiast 2,5 sekundy. Każdy program zachowuje stan i działa niezależnie.
3. Pętla zdarzeń we / wy
Pętla zdarzeń, która obsługuje, sleep
jest odpowiednia do sondowania . Jednak oczekiwanie na I / O na uchwycie pliku może być wykonane bardziej wydajnie: system operacyjny implementuje I / O i wie, które uchwyty są gotowe. W idealnym przypadku pętla zdarzeń powinna obsługiwać jawne zdarzenie „gotowe do wejścia / wyjścia”.
3.1. select
wezwanie
Python ma już interfejs do wysyłania zapytań do systemu operacyjnego w celu odczytania uchwytów we / wy. Gdy jest wywoływana z uchwytami do odczytu lub zapisu, zwraca uchwyty gotowe do odczytu lub zapisu:
readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)
Na przykład możemy open
plik do zapisu i poczekać aż będzie gotowy:
write_target = open('/tmp/foo')
readable, writeable, _ = select.select([], [write_target], [])
Po wybraniu zwraca, writeable
zawiera nasz otwarty plik.
3.2. Podstawowe zdarzenie we / wy
Podobnie jak w przypadku AsyncSleep
żądania, musimy zdefiniować zdarzenie dla I / O. Zgodnie z select
logiką bazową zdarzenie musi odnosić się do czytelnego obiektu - powiedzmy do open
pliku. Ponadto przechowujemy, ile danych do odczytania.
class AsyncRead:
def __init__(self, file, amount=1):
self.file = file
self.amount = amount
self._buffer = ''
def __await__(self):
while len(self._buffer) < self.amount:
yield self
# we only get here if ``read`` should not block
self._buffer += self.file.read(1)
return self._buffer
def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.file, self.amount, len(self._buffer)
)
Podobnie jak w przypadku AsyncSleep
, przechowujemy głównie dane wymagane dla podstawowego wywołania systemowego. Tym razem __await__
możliwe jest wielokrotne wznawianie - aż do przeczytania naszego pożądanego amount
. Ponadto otrzymujemy return
wynik I / O zamiast po prostu wznawiać.
3.3. Rozszerzanie pętli zdarzeń o odczyt we / wy
Podstawą naszej pętli zdarzeń jest nadal run
zdefiniowana wcześniej. Najpierw musimy śledzić żądania odczytu. To nie jest już uporządkowany harmonogram, tylko mapujemy żądania odczytu do korektorów.
# new
waiting_read = {} # type: Dict[file, coroutine]
Ponieważ select.select
przyjmuje parametr timeout, możemy go użyć zamiast time.sleep
.
# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(reads), [], [])
W ten sposób otrzymujemy wszystkie czytelne pliki - jeśli takie istnieją, uruchamiamy odpowiedni program. Jeśli ich nie ma, czekaliśmy wystarczająco długo na uruchomienie naszego obecnego programu.
# new - reschedule waiting coroutine, run readable coroutine
if readable:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read[readable[0]]
Wreszcie, musimy faktycznie nasłuchiwać żądań odczytu.
# new
if isinstance(command, AsyncSleep):
...
elif isinstance(command, AsyncRead):
...
3.4. Składając to razem
Powyższe było trochę uproszczeniem. Musimy trochę przejść, żeby nie głodować śpiących programów, jeśli zawsze potrafimy czytać. Musimy sobie poradzić, nie mając nic do czytania lub nie czekając. Jednak wynik końcowy nadal mieści się w 30 LOC.
def run(*coroutines):
"""Cooperatively run all ``coroutines`` until completion"""
waiting_read = {} # type: Dict[file, coroutine]
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting or waiting_read:
# 2. wait until the next coroutine may run or read ...
try:
until, coroutine = waiting.pop(0)
except IndexError:
until, coroutine = float('inf'), None
readable, _, _ = select.select(list(waiting_read), [], [])
else:
readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
# ... and select the appropriate one
if readable and time.time() < until:
if until and coroutine:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read.pop(readable[0])
# 3. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension ...
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])
# ... or register reads
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
3.5. Współpraca we / wy
Te AsyncSleep
, AsyncRead
i run
implementacje są teraz w pełni funkcjonalny do snu i / lub przeczyta. Tak samo jak w przypadku sleepy
, możemy zdefiniować pomocnika do testowania czytania:
async def ready(path, amount=1024*32):
print('read', path, 'at', '%d' % time.time())
with open(path, 'rb') as file:
result = return await AsyncRead(file, amount)
print('done', path, 'at', '%d' % time.time())
print('got', len(result), 'B')
run(sleepy('background', 5), ready('/dev/urandom'))
Uruchamiając to, widzimy, że nasze I / O są przeplatane z oczekującym zadaniem:
id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B
4. Nieblokujące we / wy
Chociaż operacje we / wy na plikach są zrozumiałe, nie są one odpowiednie dla takich bibliotek, jak asyncio
: select
wywołanie zawsze zwraca pliki i oba open
i read
mogą blokować się na czas nieokreślony . To blokuje wszystkie procedury pętli zdarzeń - co jest złe. Biblioteki lubiąaiofiles
używają wątków i synchronizacji do fałszywego nieblokującego wejścia / wyjścia i zdarzeń w pliku.
Jednak gniazda pozwalają na nieblokujące wejścia / wyjścia - a ich nieodłączne opóźnienie sprawia, że jest to znacznie bardziej krytyczne. W przypadku użycia w pętli zdarzeń oczekiwanie na dane i ponawianie próby można zawinąć bez blokowania czegokolwiek.
4.1. Nieblokujące zdarzenie we / wy
Podobnie jak w naszym przypadku AsyncRead
, możemy zdefiniować zdarzenie wstrzymania i odczytu dla gniazd. Zamiast pobierać plik, bierzemy gniazdo - które musi być nieblokujące. Ponadto nasze __await__
zastosowania socket.recv
zamiast file.read
.
class AsyncRecv:
def __init__(self, connection, amount=1, read_buffer=1024):
assert not connection.getblocking(), 'connection must be non-blocking for async recv'
self.connection = connection
self.amount = amount
self.read_buffer = read_buffer
self._buffer = b''
def __await__(self):
while len(self._buffer) < self.amount:
try:
self._buffer += self.connection.recv(self.read_buffer)
except BlockingIOError:
yield self
return self._buffer
def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.connection, self.amount, len(self._buffer)
)
W przeciwieństwie do AsyncRead
, __await__
wykonuje naprawdę nieblokujące operacje we / wy. Kiedy dane są dostępne, zawsze czyta. Gdy żadne dane nie są dostępne, zawiesza się zawsze . Oznacza to, że pętla zdarzeń jest blokowana tylko wtedy, gdy wykonujemy pożyteczną pracę.
4.2. Odblokowanie pętli zdarzeń
Jeśli chodzi o pętlę zdarzeń, nic się nie zmienia. Zdarzenie do nasłuchiwania jest nadal takie samo jak w przypadku plików - deskryptor pliku oznaczony jako gotowy select
.
# old
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
waiting_read[command.connection] = coroutine
W tym momencie powinno być oczywiste, że AsyncRead
i AsyncRecv
są to tego samego rodzaju wydarzenia. Moglibyśmy łatwo przekształcić je w jedno zdarzenie z wymiennym komponentem I / O. W efekcie pętla zdarzeń, procedury i zdarzenia wyraźnie oddzielają program planujący, dowolny kod pośredni i rzeczywiste operacje wejścia / wyjścia.
4.3. Brzydka strona nieblokujących wejść / wyjść
Zasadniczo to, co powinieneś zrobić w tym momencie, to powtórzyć logikę read
as a recv
for AsyncRecv
. Jednak teraz jest to o wiele bardziej brzydkie - musisz obsługiwać wczesne zwroty, gdy funkcje blokują się wewnątrz jądra, ale dają ci kontrolę. Na przykład otwarcie połączenia w porównaniu z otwarciem pliku jest znacznie dłuższe:
# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
connection.connect((url, port))
except BlockingIOError:
pass
Krótko mówiąc, pozostaje kilkadziesiąt wierszy obsługi wyjątków. W tym momencie zdarzenia i pętla zdarzeń już działają.
id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5
Uzupełnienie
Przykładowy kod na github
BaseEventLoop
zaimplementowano CPython : github.com/python/cpython/blob/ ...