Udostępnianie kolejki wyników kilku procesom


95

Dokumentacja multiprocessingmodułu pokazuje, jak przekazać kolejkę do procesu uruchomionego multiprocessing.Process. Ale jak mogę współużytkować kolejkę z asynchronicznymi procesami roboczymi, od których rozpoczęto apply_async? Nie potrzebuję dynamicznego łączenia ani niczego innego, tylko sposób, aby pracownicy (wielokrotnie) zgłaszali swoje wyniki z powrotem do bazy.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    q = multiprocessing.Queue()
    workers = pool.apply_async(worker, (33, q))

To nie powiedzie się z: RuntimeError: Queue objects should only be shared between processes through inheritance. Rozumiem, co to oznacza, i rozumiem rady dotyczące dziedziczenia zamiast wytrawiania / usuwania wytrawiania (i wszystkie specjalne ograniczenia systemu Windows). Ale jak nie mogę przekazać kolejkę w sposób, który działa? Nie mogę znaleźć przykładu, a próbowałem kilku alternatyw, które zawiodły na różne sposoby. Prosimy o pomoc?

Odpowiedzi:


137

Spróbuj użyć multiprocessing.Manager, aby zarządzać kolejką, a także udostępniać ją różnym pracownikom.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue()
    workers = pool.apply_async(worker, (33, q))

Zrobiło to, dzięki! Wystąpił niepowiązany problem z wywołaniem asynchronicznym w moim oryginalnym kodzie, więc skopiowałem również poprawkę do Twojej odpowiedzi.
Alexis

17
Jakieś wyjaśnienie, dlaczego queue.Queue()nie jest do tego odpowiednie?
mrgloom

1
@mrgloom: queue.Queuezostał zbudowany do obsługi wątków, przy użyciu blokad w pamięci. W środowisku wieloprocesowym każdy podproces otrzymywałby własną kopię queue.Queue()instancji we własnej przestrzeni pamięci, ponieważ podprocesy nie współużytkują pamięci (w większości).
LeoRochael

1
@alexis Jak pobrać elementy z Managera (). Queue () po tym, jak wielu pracowników wstawi do niego dane?
MSS


14

multiprocessing.Poolma już udostępnioną kolejkę wyników, nie ma potrzeby dodatkowego angażowania pliku Manager.Queue. Manager.Queuejest queue.Queue(kolejką wielowątkową) pod maską, zlokalizowaną na oddzielnym procesie serwerowym i ujawnioną przez serwery proxy. To dodaje dodatkowe obciążenie w porównaniu z kolejką wewnętrzną puli. W przeciwieństwie do natywnej obsługi wyników puli, wyniki Manager.Queuerównież nie mają gwarancji uporządkowania.

Procesy robocze nie są uruchamiane .apply_async(), to już się dzieje podczas tworzenia instancji Pool. Co się zaczęło, kiedy nazywają pool.apply_async()to nowy „praca”. Procesy multiprocessing.pool.workerrobocze puli uruchamiają funkcję pod maską. Ta funkcja zajmuje się przetwarzaniem nowych "zadań" przenoszonych do wewnętrznej puli Pool._inqueuei wysyłaniem wyników z powrotem do rodzica przez Pool._outqueue. Twój określony funczostanie wykonany w ciągu multiprocessing.pool.worker. funcmusi tylko returncoś, a wynik zostanie automatycznie odesłany do rodzica.

.apply_async() natychmiast (asynchronicznie) zwraca AsyncResultobiekt (alias dla ApplyResult). Musisz wywołać .get()(blokuje) ten obiekt, aby otrzymać rzeczywisty wynik. Inną opcją byłoby zarejestrowanie funkcji zwrotnej , która jest uruchamiana, gdy tylko wynik będzie gotowy.

from multiprocessing import Pool

def busy_foo(i):
    """Dummy function simulating cpu-bound work."""
    for _ in range(int(10e6)):  # do stuff
        pass
    return i

if __name__ == '__main__':

    with Pool(4) as pool:
        print(pool._outqueue)  # DEMO
        results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
        # `.apply_async()` immediately returns AsyncResult (ApplyResult) object
        print(results[0])  # DEMO
        results = [res.get() for res in results]
        print(f'result: {results}')       

Przykładowe dane wyjściowe:

<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Uwaga: Określenie timeoutparametru -parametru dla .get()nie zatrzyma rzeczywistego przetwarzania zadania w ramach procesu roboczego, a jedynie odblokuje oczekującego rodzica, podnosząc multiprocessing.TimeoutError.


Ciekawe, spróbuję tego przy pierwszej okazji. Z pewnością nie działało w ten sposób w 2012 roku.
alexis

@alexis Python 2.7 (2010) w tym przypadku brakuje tylko menedżera kontekstu i error_callbackparametru -parametru dla apply_async, więc od tego czasu niewiele się zmieniło.
Darkonaut

Zauważyłem, że funkcja zwrotna jest najbardziej użyteczna, zwłaszcza w połączeniu z funkcją częściową, aby umożliwić używanie zwykłej listy do zbierania wyników asynchronicznych, jak opisano tutaj; gist.github.com/Glench/5789879
user5359531
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.