Wieloprocesorowość: jak używać Pool.map na funkcji zdefiniowanej w klasie?


179

Kiedy biegnę coś takiego:

from multiprocessing import Pool

p = Pool(5)
def f(x):
     return x*x

p.map(f, [1,2,3])

to działa dobrze. Jednak umieszczając to jako funkcję klasy:

class calculate(object):
    def run(self):
        def f(x):
            return x*x

        p = Pool()
        return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

Daje mi następujący błąd:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/sw/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Widziałem post Alexa Martelliego dotyczący tego samego problemu, ale nie był wystarczająco jasny.


1
„to jako funkcja klasy”? Czy możesz opublikować kod, który faktycznie wyświetla rzeczywisty błąd. Bez właściwego kodu możemy tylko zgadywać, co robisz źle.
S.Lott,

Ogólnie rzecz biorąc, istnieją moduły do ​​wytrawiania o większej mocy niż standardowy moduł Pythona (jak wspomniany w tej odpowiedzi moduł picloud ).
klaus se

1
Miałem podobny problem z zamknięciami w IPython.Parallelśrodku, ale tam można było obejść ten problem, popychając obiekty do węzłów. Obejście tego problemu z przetwarzaniem wieloprocesowym wydaje się dość denerwujące.
Alex S

Tutaj calculatejest picklable, a więc wydaje się, że może to zostać rozwiązane przez: 1) tworzenia obiektów funkcji z konstruktora kopie nad calculateprzykład, a następnie 2) przechodzącego wystąpienie tego obiektu funkcyjnej, Pooljest mapsposobem. Nie?
rd11

1
@math Nie wierzę, że żadna z „ostatnich zmian” w Pythonie będzie pomocna. Niektóre ograniczenia multiprocessingmodułu wynikają z jego celu, jakim jest implementacja międzyplatformowa, oraz braku fork(2)podobnego wywołania systemowego w systemie Windows. Jeśli nie zależy Ci na obsłudze Win32, może istnieć prostsze obejście oparte na procesach. Lub jeśli jesteś przygotowany do korzystania wątki zamiast procesów, można zastąpić from multiprocessing import Poolz from multiprocessing.pool import ThreadPool as Pool.
Aya

Odpowiedzi:


69

Denerwowały mnie również ograniczenia dotyczące tego, jakie funkcje może akceptować pool.map. Napisałem następujące informacje, aby to obejść. Wydaje się, że działa, nawet przy rekurencyjnym użyciu parmap.

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe, x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f, X):
    pipe = [Pipe() for x in X]
    proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p, c) in pipe]

if __name__ == '__main__':
    print parmap(lambda x: x**x, range(1, 5))

1
Bardzo mi się to udało, dziękuję. Znalazłem jedną słabość: próbowałem użyć parmap w niektórych funkcjach, które przekazywały defaultdict i ponownie otrzymałem PicklingError. Nie znalazłem rozwiązania tego problemu, po prostu przerobiłem kod, aby nie używać domyślnego słowa.
bez

2
To nie działa w Pythonie 2.7.2 (domyślnie, 12 czerwca 2011, 15:08:59) [MSC v.1500 32 bit (Intel)] na win32
ubershmekel

3
To działa w Pythonie 2.7.3 1 sierpnia 2012, 05:14:39. To nie działa w przypadku gigantycznych iterable -> powoduje błąd OSError: [Errno 24] Zbyt wiele otwartych plików ze względu na liczbę otwieranych potoków.
Eiyrioü von Kauyf

To rozwiązanie powoduje utworzenie procesu dla każdego elementu roboczego. Poniższe rozwiązanie „klaus se” jest bardziej wydajne.
ypnos

85

Nie mogłem użyć dotychczas opublikowanych kodów, ponieważ kody używające "multiprocessing.Pool" nie działają z wyrażeniami lambda, a kody nie używające "multiprocessing.Pool" generują tyle procesów, ile jest elementów pracy.

Dostosowałem kod tak, aby tworzył predefiniowaną liczbę pracowników i iterował listę wejściową tylko wtedy, gdy istnieje bezczynny pracownik. Włączyłem również tryb "demona" dla pracowników. Ctrl-c działa zgodnie z oczekiwaniami.

import multiprocessing


def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))


def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()

    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]

    [p.join() for p in proc]

    return [x for i, x in sorted(res)]


if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))

2
Jak sprawić, by pasek postępu poprawnie działał z tą parmapfunkcją?
Shockburner,

2
Pytanie - skorzystałem z tego rozwiązania, ale zauważyłem, że uruchomione przeze mnie procesy Pythona pozostają aktywne w pamięci. Jakieś szybkie przemyślenie, jak zabić tych, gdy twoja parmapa wyjdzie?
CompEcon

1
@ klaus-se Wiem, że odradza się nam tylko podziękowanie w komentarzach, ale Twoja odpowiedź jest dla mnie zbyt cenna, nie mogłem się oprzeć. Chciałbym móc dać ci więcej niż tylko jedną reputację ...
deshtop

2
@greole przekazane (None, None)jako ostatni element wskazuje fun, że osiągnął koniec sekwencji elementów dla każdego procesu.
aganders3

4
@deshtop: możesz z nagrodą, jeśli sam masz wystarczającą reputację :-)
Mark

57

Wieloprocesowe przetwarzanie i wytrawianie jest zepsute i ograniczone, chyba że wyskoczysz poza standardową bibliotekę.

Jeśli używasz rozwidlenia multiprocessingwywołanego pathos.multiprocesssing, możesz bezpośrednio używać klas i metod klas w mapfunkcjach wieloprocesorowych . Dzieje się tak, ponieważ dilljest używany zamiast picklelub cPicklei dillmoże serializować prawie wszystko w Pythonie.

pathos.multiprocessingzapewnia również asynchroniczną funkcję mapy… i może mapdziałać z wieloma argumentami (np. map(math.pow, [1,2,3], [4,5,6]))

Zobacz dyskusje: Co może zrobić wieloprocesorowość i koperek?

i: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

Obsługuje nawet kod, który napisałeś na początku, bez modyfikacji i od interpretera. Po co cokolwiek, co jest bardziej delikatne i specyficzne dla pojedynczego przypadku?

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
...  def run(self):
...   def f(x):
...    return x*x
...   p = Pool()
...   return p.map(f, [1,2,3])
... 
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]

Pobierz kod tutaj: https://github.com/uqfoundation/pathos

I żeby pokazać, co potrafi:

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]

1
pathos.multiprocessing ma również funkcję map asynchronous ( amap), która umożliwia korzystanie z pasków postępu i innego programowania asynchronicznego.
Mike McKerns

Podoba mi się pathos.multiprocessing, który może służyć prawie do bezpośredniego zastąpienia mapy nierównoległej, jednocześnie ciesząc się przetwarzaniem wieloprocesowym. Mam proste opakowanie pathos.multiprocessing.map, które zapewnia większą wydajność pamięci podczas przetwarzania dużej struktury danych tylko do odczytu na wielu rdzeniach. Zobacz to repozytorium git .
Fashandge

Wydaje się interesujące, ale się nie instaluje. Oto wiadomość, którą przekazuje pip:Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
xApple

1
Tak. Od jakiegoś czasu nie publikowałem, ponieważ dzieliłem funkcjonalność na osobne pakiety, a także konwertowałem na kod zgodny z 2/3. Wiele z powyższych zostało zmodularyzowanych, multiprocessco jest kompatybilne w 2/3. Zobacz stackoverflow.com/questions/27873093/… i pypi.python.org/pypi/multiprocess .
Mike McKerns

3
@xApple: Jako kontynuacja, pathosdoczekał się nowej stabilnej wersji i jest również kompatybilny z 2.x i 3.x.
Mike McKerns

40

O ile wiem, obecnie nie ma rozwiązania twojego problemu: funkcja, którą dajesz, map()musi być dostępna poprzez import twojego modułu. Dlatego działa kod Roberta: funkcję f()można uzyskać, importując następujący kod:

def f(x):
    return x*x

class Calculate(object):
    def run(self):
        p = Pool()
        return p.map(f, [1,2,3])

if __name__ == '__main__':
    cl = Calculate()
    print cl.run()

Właściwie dodałem sekcję „main”, ponieważ jest ona zgodna z zaleceniami dla platformy Windows („Upewnij się, że moduł główny można bezpiecznie zaimportować przez nowy interpreter Pythona bez powodowania niezamierzonych efektów ubocznych”).

Dodałem również wielką literę przed Calculate, aby zastosować się do PEP 8 . :)


18

Rozwiązanie mrule jest poprawne, ale zawiera błąd: jeśli dziecko odeśle dużą ilość danych, może wypełnić bufor potoku, blokując bufor dziecka pipe.send(), podczas gdy rodzic czeka, aż dziecko wyjdzie pipe.join(). Rozwiązaniem jest odczytanie danych dziecka przed join()nim. Ponadto dziecko powinno zamknąć koniec rury rodzica, aby zapobiec zakleszczeniu. Poniższy kod rozwiązuje ten problem. Należy również pamiętać, że parmaptworzy to jeden proces na element w X. Bardziej zaawansowanym rozwiązaniem jest multiprocessing.cpu_count()podzielenie Xna kilka fragmentów, a następnie scalenie wyników przed zwróceniem. Zostawiam to czytelnikowi jako ćwiczenie, aby nie zepsuć zwięzłości ładnej odpowiedzi muła. ;)

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(ppipe, cpipe,x):
        ppipe.close()
        cpipe.send(f(x))
        cpipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    ret = [p.recv() for (p,c) in pipe]
    [p.join() for p in proc]
    return ret

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))

Jak dobierasz liczbę procesów?
patapouf_ai

Jednak umiera dość szybko z powodu błędu OSError: [Errno 24] Too many open files. Myślę, że muszą istnieć jakieś ograniczenia liczby procesów, aby działał poprawnie ...
patapouf_ai

13

Ja też się z tym zmagałem. Miałem funkcje jako członkowie danych klasy, jako uproszczony przykład:

from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # Needed to do something like this (the following line won't work)
        return pool.map(self.f,list1,list2)  

Musiałem użyć funkcji self.f w wywołaniu Pool.map () z tej samej klasy, a self.f nie przyjęło krotki jako argumentu. Ponieważ ta funkcja została osadzona w klasie, nie było dla mnie jasne, jak napisać typ opakowania, które sugerowały inne odpowiedzi.

Rozwiązałem ten problem, używając innego opakowania, które przyjmuje krotkę / listę, gdzie pierwszym elementem jest funkcja, a pozostałe elementy to argumenty tej funkcji, zwanej eval_func_tuple (f_args). Używając tego, problematyczna linia może zostać zastąpiona przez return pool.map (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)). Oto pełny kod:

Plik: util.py

def add(a, b): return a+b

def eval_func_tuple(f_args):
    """Takes a tuple of a function and args, evaluates and returns result"""
    return f_args[0](*f_args[1:])  

Plik: main.py

from multiprocessing import Pool
import itertools
import util  

pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # The following line will now work
        return pool.map(util.eval_func_tuple, 
            itertools.izip(itertools.repeat(self.f), list1, list2)) 

if __name__ == '__main__':
    myExample = Example(util.add)
    list1 = [1, 2, 3]
    list2 = [10, 20, 30]
    print myExample.add_lists(list1, list2)  

Uruchomienie main.py da [11, 22, 33]. Możesz to poprawić, na przykład eval_func_tuple można również zmodyfikować tak, aby przyjmował argumenty słów kluczowych.

Z drugiej strony, w innych odpowiedziach, funkcja „parmap” może być bardziej wydajna w przypadku większej liczby Procesów niż liczba dostępnych procesorów. Kopiuję edytowaną wersję poniżej. To jest mój pierwszy post i nie byłem pewien, czy powinienem bezpośrednio edytować oryginalną odpowiedź. Zmieniłem też nazwy niektórych zmiennych.

from multiprocessing import Process, Pipe  
from itertools import izip  

def spawn(f):  
    def fun(pipe,x):  
        pipe.send(f(x))  
        pipe.close()  
    return fun  

def parmap(f,X):  
    pipe=[Pipe() for x in X]  
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]  
    numProcesses = len(processes)  
    processNum = 0  
    outputList = []  
    while processNum < numProcesses:  
        endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)  
        for proc in processes[processNum:endProcessNum]:  
            proc.start()  
        for proc in processes[processNum:endProcessNum]:  
            proc.join()  
        for proc,c in pipe[processNum:endProcessNum]:  
            outputList.append(proc.recv())  
        processNum = endProcessNum  
    return outputList    

if __name__ == '__main__':  
    print parmap(lambda x:x**x,range(1,5))         

8

Wziąłem odpowiedź klausa se i aganders3 i stworzyłem udokumentowany moduł, który jest bardziej czytelny i przechowuje w jednym pliku. Możesz go po prostu dodać do swojego projektu. Ma nawet opcjonalny pasek postępu!

"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.

Adapted from http://stackoverflow.com/a/16071616/287297

Example usage:

    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)

Comments:

"It spawns a predefined amount of workers and only iterates through the input list
 if there exists an idle worker. I also enabled the "daemon" mode for the workers so
 that KeyboardInterupt works as expected."

Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.

Alternatively, use this fork of multiprocessing: 
https://github.com/uqfoundation/multiprocess
"""

# Modules #
import multiprocessing
from tqdm import tqdm

################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
    while not queue_in.empty():
        num, obj = queue_in.get()
        queue_out.put((num, func_to_apply(obj)))

################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
    # Number of processes to use #
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
    # Create queues #
    q_in  = multiprocessing.Queue()
    q_out = multiprocessing.Queue()
    # Process list #
    new_proc  = lambda t,a: multiprocessing.Process(target=t, args=a)
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
    # Put all the items (objects) in the queue #
    sent = [q_in.put((i, x)) for i, x in enumerate(items)]
    # Start them all #
    for proc in processes:
        proc.daemon = True
        proc.start()
    # Display progress bar or not #
    if verbose:
        results = [q_out.get() for x in tqdm(range(len(sent)))]
    else:
        results = [q_out.get() for x in range(len(sent))]
    # Wait for them to finish #
    for proc in processes: proc.join()
    # Return results #
    return [x for i, x in sorted(results)]

################################################################################
def test():
    def slow_square(x):
        import time
        time.sleep(2)
        return x**2
    objs    = range(20)
    squares = prll_map(slow_square, objs, 4, verbose=True)
    print "Result: %s" % squares

EDYCJA : Dodano sugestię @ alexander-mcfarlane i funkcję testową


jeden problem z paskiem postępu ... Pasek mierzy tylko, jak nieefektywnie obciążenie zostało podzielone między procesory. Jeśli obciążenie jest idealnie podzielone, wszystkie procesory będą join()w tym samym czasie, a na wyświetlaczu pojawi się błysk 100%zakończenia tqdm. Jedyny przypadek, w którym będzie to przydatne, to sytuacja, gdy każdy procesor ma tendencyjne obciążenie pracą
Alexander McFarlane,

1
przesuń się, tqdm()aby zawinąć linię: result = [q_out.get() for _ in tqdm(sent)]i działa o wiele lepiej - wielki wysiłek, ale naprawdę to doceniam, więc +1
Alexander McFarlane

Dzięki za radę, spróbuję i zaktualizuję odpowiedź!
xApple,

Odpowiedź jest aktualizowana, a pasek postępu działa znacznie lepiej!
xApple

8

Wiem, że zadawano to ponad 6 lat temu, ale chciałem tylko dodać moje rozwiązanie, ponieważ niektóre z powyższych sugestii wydają się strasznie skomplikowane, ale moje rozwiązanie było w rzeczywistości bardzo proste.

Wszystko, co musiałem zrobić, to zawinąć wywołanie pool.map () do funkcji pomocniczej. Przekazanie obiektu klasy wraz z argumentami dla metody jako krotki, która wyglądała trochę tak.

def run_in_parallel(args):
    return args[0].method(args[1])

myclass = MyClass()
method_args = [1,2,3,4,5,6]
args_map = [ (myclass, arg) for arg in method_args ]
pool = Pool()
pool.map(run_in_parallel, args_map)

7

Funkcje zdefiniowane w klasach (nawet w funkcjach wewnątrz klas) tak naprawdę nie trawią. Jednak to działa:

def f(x):
    return x*x

class calculate(object):
    def run(self):
        p = Pool()
    return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

15
dzięki, ale uważam, że definiowanie funkcji poza klasą jest trochę brudne. Klasa powinna zawierać wszystko, czego potrzebuje do wykonania danego zadania.
Mermoz

3
@Memoz: "Klasa powinna zawierać wszystko, czego potrzebuje" Naprawdę? Nie mogę znaleźć na to wielu przykładów. Większość klas zależy od innych klas lub funkcji. Po co nazywać zależność klasową „brudną”? Co jest złego w zależności?
S.Lott

Cóż, funkcja nie powinna modyfikować istniejących danych klasy - ponieważ modyfikowałaby wersję w innym procesie - więc mogłaby to być metoda statyczna. Możesz posortować metodę statyczną: stackoverflow.com/questions/1914261/… Lub, w przypadku czegoś tak trywialnego, możesz użyć lambdy.
robert

6

Wiem, że to pytanie padło 8 lat i 10 miesięcy temu, ale chcę Wam przedstawić moje rozwiązanie:

from multiprocessing import Pool

class Test:

    def __init__(self):
        self.main()

    @staticmethod
    def methodForMultiprocessing(x):
        print(x*x)

    def main(self):
        if __name__ == "__main__":
            p = Pool()
            p.map(Test.methodForMultiprocessing, list(range(1, 11)))
            p.close()

TestObject = Test()

Musisz tylko przekształcić swoją klasę w metodę statyczną. Ale jest to również możliwe z metodą klasową:

from multiprocessing import Pool

class Test:

    def __init__(self):
        self.main()

    @classmethod
    def methodForMultiprocessing(cls, x):
        print(x*x)

    def main(self):
        if __name__ == "__main__":
            p = Pool()
            p.map(Test.methodForMultiprocessing, list(range(1, 11)))
            p.close()

TestObject = Test()

Przetestowano w Pythonie 3.7.3


3

Zmodyfikowałem metodę klaus se, ponieważ podczas pracy z małymi listami zawieszała się, gdy liczba pozycji wynosiła ~ 1000 lub więcej. Zamiast przesuwać zadania pojedynczo z Nonewarunkiem zatrzymania, ładuję kolejkę wejściową naraz i po prostu pozwalam procesom miażdżyć ją, aż będzie pusta.

from multiprocessing import cpu_count, Queue, Process

def apply_func(f, q_in, q_out):
    while not q_in.empty():
        i, x = q_in.get()
        q_out.put((i, f(x)))

# map a function using a pool of processes
def parmap(f, X, nprocs = cpu_count()):
    q_in, q_out   = Queue(), Queue()
    proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)]
    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [p.start() for p in proc]
    res = [q_out.get() for _ in sent]
    [p.join() for p in proc]

    return [x for i,x in sorted(res)]

Edycja: niestety teraz napotykam ten błąd w moim systemie: Limit maksymalnego rozmiaru kolejki wieloprocesowej wynosi 32767 , mam nadzieję, że obejścia tam pomogą.


1

Możesz uruchomić kod bez żadnych problemów, jeśli w jakiś sposób ręcznie zignorujesz Poolobiekt z listy obiektów w klasie, ponieważ nie jest on w picklestanie, jak mówi błąd. Możesz to zrobić za pomocą __getstate__funkcji (patrz również tutaj ) w następujący sposób. PoolObiekt będzie starał się znaleźć __getstate__i __setstate__funkcje i wykonać je, jeśli uzna to po uruchomieniu map, map_asyncitp:

class calculate(object):
    def __init__(self):
        self.p = Pool()
    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['p']
        return self_dict
    def __setstate__(self, state):
        self.__dict__.update(state)

    def f(self, x):
        return x*x
    def run(self):
        return self.p.map(self.f, [1,2,3])

Następnie wykonaj:

cl = calculate()
cl.run()

da ci wynik:

[1, 4, 9]

Przetestowałem powyższy kod w Pythonie 3.xi działa.


0

Nie jestem pewien, czy przyjęto takie podejście, ale obejście, którego używam, jest:

from multiprocessing import Pool

t = None

def run(n):
    return t.f(n)

class Test(object):
    def __init__(self, number):
        self.number = number

    def f(self, x):
        print x * self.number

    def pool(self):
        pool = Pool(2)
        pool.map(run, range(10))

if __name__ == '__main__':
    t = Test(9)
    t.pool()
    pool = Pool(2)
    pool.map(run, range(10))

Wynik powinien być:

0
9
18
27
36
45
54
63
72
81
0
9
18
27
36
45
54
63
72
81

0
class Calculate(object):
  # Your instance method to be executed
  def f(self, x, y):
    return x*y

if __name__ == '__main__':
  inp_list = [1,2,3]
  y = 2
  cal_obj = Calculate()
  pool = Pool(2)
  results = pool.map(lambda x: cal_obj.f(x, y), inp_list)

Istnieje możliwość, że zechcesz zastosować tę funkcję dla każdej innej instancji klasy. Oto rozwiązanie również na to

class Calculate(object):
  # Your instance method to be executed
  def __init__(self, x):
    self.x = x

  def f(self, y):
    return self.x*y

if __name__ == '__main__':
  inp_list = [Calculate(i) for i in range(3)]
  y = 2
  pool = Pool(2)
  results = pool.map(lambda x: x.f(y), inp_list)

0

Oto moje rozwiązanie, które moim zdaniem jest nieco mniej hakerskie niż większość innych tutaj. Jest podobny do odpowiedzi Nightowl.

someclasses = [MyClass(), MyClass(), MyClass()]

def method_caller(some_object, some_method='the method'):
    return getattr(some_object, some_method)()

othermethod = partial(method_caller, some_method='othermethod')

with Pool(6) as pool:
    result = pool.map(othermethod, someclasses)

0

Od http://www.rueckstiess.net/research/snippets/show/ca1d7d90 i http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html

Możemy utworzyć funkcję zewnętrzną i zapełnić ją obiektem klasy self:

from joblib import Parallel, delayed
def unwrap_self(arg, **kwarg):
    return square_class.square_int(*arg, **kwarg)

class square_class:
    def square_int(self, i):
        return i * i

    def run(self, num):
        results = []
        results = Parallel(n_jobs= -1, backend="threading")\
            (delayed(unwrap_self)(i) for i in zip([self]*len(num), num))
        print(results)

LUB bez Joblib:

from multiprocessing import Pool
import time

def unwrap_self_f(arg, **kwarg):
    return C.f(*arg, **kwarg)

class C:
    def f(self, name):
        print 'hello %s,'%name
        time.sleep(5)
        print 'nice to meet you.'

    def run(self):
        pool = Pool(processes=2)
        names = ('frank', 'justin', 'osi', 'thomas')
        pool.map(unwrap_self_f, zip([self]*len(names), names))

if __name__ == '__main__':
    c = C()
    c.run()

0

Może to nie jest zbyt dobre rozwiązanie, ale w moim przypadku rozwiązuję to w ten sposób.

from multiprocessing import Pool

def foo1(data):
    self = data.get('slf')
    lst = data.get('lst')
    return sum(lst) + self.foo2()

class Foo(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b

    def foo2(self):
        return self.a**self.b   

    def foo(self):
        p = Pool(5)
        lst = [1, 2, 3]
        result = p.map(foo1, (dict(slf=self, lst=lst),))
        return result

if __name__ == '__main__':
    print(Foo(2, 4).foo())

Musiałem przejść selfdo mojej funkcji, ponieważ muszę uzyskać dostęp do atrybutów i funkcji mojej klasy za pośrednictwem tej funkcji. To działa dla mnie. Zawsze mile widziane są poprawki i sugestie.


0

Oto szablon, który napisałem dla korzystania z wieloprocesorowej puli w python3, w szczególności python3.7.7 został użyty do uruchomienia testów. Moje najszybsze biegi uzyskałem przy użyciu imap_unordered. Po prostu podłącz swój scenariusz i wypróbuj go. Możesz użyć timeitlub po prostu time.time()dowiedzieć się, co jest dla Ciebie najlepsze.

import multiprocessing
import time

NUMBER_OF_PROCESSES = multiprocessing.cpu_count()
MP_FUNCTION = 'starmap'  # 'imap_unordered' or 'starmap' or 'apply_async'

def process_chunk(a_chunk):
    print(f"processig mp chunk {a_chunk}")
    return a_chunk


map_jobs = [1, 2, 3, 4]

result_sum = 0

s = time.time()
if MP_FUNCTION == 'imap_unordered':
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
    for i in pool.imap_unordered(process_chunk, map_jobs):
        result_sum += i
elif MP_FUNCTION == 'starmap':
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
    try:
        map_jobs = [(i, ) for i in map_jobs]
        result_sum = pool.starmap(process_chunk, map_jobs)
        result_sum = sum(result_sum)
    finally:
        pool.close()
        pool.join()
elif MP_FUNCTION == 'apply_async':
    with multiprocessing.Pool(processes=NUMBER_OF_PROCESSES) as pool:
        result_sum = [pool.apply_async(process_chunk, [i, ]).get() for i in map_jobs]
    result_sum = sum(result_sum)
print(f"result_sum is {result_sum}, took {time.time() - s}s")

W powyższym scenariuszu imap_unorderedfaktycznie wydaje się działać najgorzej dla mnie. Wypróbuj swoją obudowę i porównaj ją na maszynie, na której planujesz ją uruchomić. Przeczytaj także o pulach procesów . Twoje zdrowie!

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.