Pula procesów Pythona nie jest demoniczna?


102

Czy byłoby możliwe utworzenie puli Pythona, która nie jest demonem? Chcę, aby pula mogła wywołać funkcję zawierającą inną pulę w środku.

Chcę tego, ponieważ procesy demona nie mogą tworzyć procesu. W szczególności spowoduje to błąd:

AssertionError: daemonic processes are not allowed to have children

Na przykład rozważmy scenariusz, w którym function_ajest uruchomiona pula, function_bktóra ma uruchomioną pulę function_c. Ten łańcuch funkcji nie powiedzie się, ponieważ function_bjest uruchamiany w procesie demona, a procesy demona nie mogą tworzyć procesów.


AFAIK, nie, nie jest możliwe, że wszyscy pracownicy w puli są zdemonizowani i nie jest możliwe wstrzyknięcie zależności , przy okazji nie rozumiem drugiej części twojego pytania I want a pool to be able to call a function that has another pool insidei jak to wpływa na fakt, że pracownicy są zdemonizowani.
mouad

4
Ponieważ jeśli funkcja a ma pulę, która uruchamia funkcję b, która ma pulę, która uruchamia funkcję c, w b występuje problem, że jest ona uruchamiana w procesie demona, a procesy demona nie mogą tworzyć procesów. AssertionError: daemonic processes are not allowed to have children
Maksymalnie

Odpowiedzi:


122

multiprocessing.pool.PoolKlasa tworzy procesy pracownik w jego __init__metodzie, sprawia im demoniczny i uruchamia je, a to nie jest możliwe, aby ponownie ustawić swój daemonatrybut False, zanim zostaną uruchomione (i potem nie wolno już). Możesz jednak utworzyć własną podklasę multiprocesing.pool.Pool( multiprocessing.Pooljest to tylko funkcja opakowująca) i zastąpić własną multiprocessing.Processpodklasę, która jest zawsze nie-demoniczna, do wykorzystania w procesach roboczych.

Oto pełny przykład, jak to zrobić. Ważnymi częściami są dwie klasy NoDaemonProcessi MyPoolna górze oraz do wywołania pool.close()i pool.join()na MyPoolkońcu Twojej instancji.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()

1
Właśnie przetestowałem mój kod ponownie w Pythonie 2.7 / 3.2 (po naprawieniu linii „print”) na Linuksie i Pythonie 2.6 / 2.7 / 3.2 OS X. Linux i Python 2.7 / 3.2 na OS X działają dobrze, ale kod rzeczywiście się zawiesza Python 2.6 na OS X (Lion). Wygląda na to, że jest to błąd w module wieloprocesorowym, który został naprawiony, ale tak naprawdę nie sprawdziłem modułu śledzenia błędów.
Chris Arndt,

1
Dzięki! W multiprocessing.freeze_support()
systemie

2
Dobra robota. Jeśli ktoś ma wyciek pamięci za pomocą tego, spróbuj użyć „z zamknięciem (MyPool (procesy = num_cpu)) jako puli:”, aby prawidłowo pozbyć się puli
Chris Lucian

32
Jakie są wady używania MyPoolzamiast domyślnego Pool? Innymi słowy, jakie koszty ponoszę w zamian za elastyczność uruchamiania procesów potomnych? (Gdyby nie było kosztów, przypuszczalnie standard Poolużywałby procesów innych niż demoniczne).
maks.

4
@machen Tak, niestety to prawda. W Pythonie 3.6 Poolklasa została gruntownie zmieniona, więc Processnie jest już prostym atrybutem, ale metodą, która zwraca instancję procesu, którą pobiera z kontekstu . Próbowałem nadpisać tę metodę, aby zwrócić NoDaemonPoolwystąpienie, ale powoduje to wyjątek, AssertionError: daemonic processes are not allowed to have childrengdy używana jest pula.
Chris Arndt

29

Musiałem zastosować nie-demoniczną pulę w Pythonie 3.7 i ostatecznie dostosowałem kod zamieszczony w zaakceptowanej odpowiedzi. Poniżej znajduje się fragment, który tworzy nie-demoniczną pulę:

import multiprocessing.pool

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(NestablePool, self).__init__(*args, **kwargs)

Ponieważ obecna implementacja multiprocessingzostała gruntownie zreformowana, aby była oparta na kontekstach, musimy zapewnić NoDaemonContextklasę, która ma NoDaemonProcessatrybut as. NestablePoolużyje wtedy tego kontekstu zamiast domyślnego.

To powiedziawszy, powinienem ostrzec, że istnieją co najmniej dwa zastrzeżenia dotyczące tego podejścia:

  1. Nadal zależy od szczegółów implementacji multiprocessingpakietu i dlatego może się zepsuć w dowolnym momencie.
  2. Istnieją ważne powody, dla multiprocessingktórych tak trudno jest używać procesów innych niż demoniczne, z których wiele zostało wyjaśnionych tutaj . Moim zdaniem najbardziej przekonujące jest:

Jeśli chodzi o zezwolenie wątkom podrzędnym na odradzanie się własnych elementów potomnych, przy użyciu podprocesu istnieje ryzyko stworzenia małej armii „wnuków” zombie, jeśli wątek nadrzędny lub podrzędny zostanie zakończony, zanim podproces zakończy się i powróci.


1
Jeśli chodzi o zastrzeżenie: Mój przypadek użycia jest parallelising zadania, ale wnuki zwracają informacje do swoich rodziców, że w informacji kolej powrotną do swoich rodziców po jakiejś wymaganego przetwarzania lokalnego. W konsekwencji każdy poziom / gałąź ma jawne oczekiwanie na wszystkie swoje liście. Czy zastrzeżenie nadal ma zastosowanie, jeśli jawnie musisz czekać na zakończenie uruchomionych procesów?
A_A

Czy zechciałbyś dodać, jak używać tego zamiast multiprocessing.pool?
Kontrolowane radiowo

„Możesz teraz używać zamiennie multiprocessing.Pool i NestablePool”.
Kontrolowane radiowo

22

Wieloprocesorowe moduł ma piękny interfejs do korzystania z basenów z procesów lub wątków. W zależności od twojego obecnego przypadku użycia, możesz rozważyć użycie multiprocessing.pool.ThreadPooldla swojej puli zewnętrznej, co spowoduje powstanie wątków (które pozwolą na uruchamianie procesów od wewnątrz), w przeciwieństwie do procesów.

Może to być ograniczone przez GIL, ale w moim konkretnym przypadku (testowałem oba) , czas uruchamiania procesów z zewnątrz, jakie zostały tutajPool utworzone , znacznie przewyższał rozwiązanie ThreadPool.


To naprawdę łatwe do wymiany Processesdla Threads. Przeczytaj więcej o tym, jak korzystać z ThreadPoolrozwiązania tutaj lub tutaj .


Dzięki - to mi bardzo pomogło - świetne wykorzystanie wątków tutaj (do tworzenia procesów, które faktycznie działają dobrze)
trance_dude

1
Dla osób szukających praktycznego rozwiązania, które prawdopodobnie pasowałoby do ich sytuacji, to jest właśnie to.
abanana

6

W niektórych wersjach Pythona zastępujących standardowe Pool zwyczaju może podnieść błąd: AssertionError: group argument must be None for now.

Tutaj znalazłem rozwiązanie, które może pomóc:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc

4

concurrent.futures.ProcessPoolExecutornie ma tego ograniczenia. Może mieć zagnieżdżoną pulę procesów bez żadnego problemu:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

Powyższy kod demonstracyjny został przetestowany w Pythonie 3.8.

Ograniczeniem jest ProcessPoolExecutorjednak to, że nie ma maxtasksperchild. Jeśli tego potrzebujesz, zamiast tego rozważ odpowiedź Massimiliano .

Kredyt: odpowiedź jfs


1
Jest to obecnie zdecydowanie najlepsze rozwiązanie, ponieważ wymaga minimalnych zmian.
DreamFlasher

1
działa świetnie! ... na marginesie użycie dziecka - multiprocessing.Poolwewnątrz a ProcessPoolExecutor.Pooljest również możliwe!
raphael

4

Problem, który napotkałem, polegał na próbie zaimportowania globali między modułami, powodując, że wiersz ProcessPool () był wielokrotnie oceniany.

globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
     
    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

Następnie bezpiecznie zaimportuj z innego miejsca w kodzie

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         

Napisałem tutaj bardziej rozszerzoną klasę opakowującą pathos.multiprocessing:

Na marginesie, jeśli Twój przypadek użycia wymaga tylko asynchronicznej mapy wieloprocesowej jako optymalizacji wydajności, to joblib będzie zarządzać wszystkimi pulami procesów w tle i zezwoli na tę bardzo prostą składnię:

squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )

3

Widziałem ludzi zajmujących się tym problemem, używając celeryrozwidlenia multiprocessingzwanego bilardem (wieloprocesorowe rozszerzenia puli), które pozwala procesom demonicznym na tworzenie dzieci. Aby obejść ten problem, wystarczy wymienić multiprocessingmoduł na:

import billiard as multiprocessing

0

Stanowi to obejście problemu, gdy błąd jest pozornie fałszywie dodatni. Jak również zauważył James , może się to zdarzyć w przypadku niezamierzonego importu z procesu demonicznego.

Na przykład, jeśli masz następujący prosty kod, WORKER_POOLmożna go nieumyślnie zaimportować od pracownika, co prowadzi do błędu.

import multiprocessing

WORKER_POOL = multiprocessing.Pool()

Prostym, ale niezawodnym sposobem obejścia tego problemu jest:

import multiprocessing
import multiprocessing.pool


class MyClass:

    @property
    def worker_pool(self) -> multiprocessing.pool.Pool:
        # Ref: https://stackoverflow.com/a/63984747/
        try:
            return self._worker_pool  # type: ignore
        except AttributeError:
            # pylint: disable=protected-access
            self.__class__._worker_pool = multiprocessing.Pool()  # type: ignore
            return self.__class__._worker_pool  # type: ignore
            # pylint: enable=protected-access

W powyższym obejściu MyClass.worker_poolmożna użyć bez błędu. Jeśli uważasz, że to podejście można ulepszyć, daj mi znać.

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.