Jak mogę odzyskać wartość zwracaną funkcji przekazanej do procesu wieloprocesowego?


190

W poniższym przykładowym kodzie chciałbym odzyskać zwracaną wartość funkcji worker. Jak mogę to zrobić? Gdzie jest przechowywana ta wartość?

Przykładowy kod:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

Wynik:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

Nie mogę znaleźć odpowiedniego atrybutu w obiektach przechowywanych w jobs.

Odpowiedzi:


189

Do komunikacji użyj zmiennej współdzielonej . Na przykład tak:

import multiprocessing

def worker(procnum, return_dict):
    '''worker function'''
    print str(procnum) + ' represent!'
    return_dict[procnum] = procnum


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print return_dict.values()

46
Zalecałbym użycie tutaj multiprocessing.Queue, a nie Managertutaj. Użycie a Managerwymaga odrodzenia całkowicie nowego procesu, który jest przesadą, kiedy Queueto zrobi.
dano

1
@dano: Zastanawiam się, czy jeśli użyjemy obiektu Queue (), nie będziemy mogli ustalić kolejności, gdy każdy proces zwróci wartość. Chodzi mi o to, że jeśli potrzebujemy kolejności w celu wykonania następnej pracy. Skąd możemy
wiedzieć

4
@Catbuilts Możesz zwrócić krotkę z każdego procesu, gdzie jedna wartość jest rzeczywistą wartością zwracaną, na której Ci zależy, a druga jest unikalnym identyfikatorem z procesu. Zastanawiam się także, dlaczego musisz wiedzieć, który proces zwraca jaką wartość. Jeśli to właśnie musisz wiedzieć o procesie, czy też musisz korelować między listą danych wejściowych a listą danych wyjściowych? W takim przypadku polecam użyć multiprocessing.Pool.mapdo przetworzenia listy elementów pracy.
dano

5
zastrzeżenia dla funkcji z jednym argumentem : należy użyć args=(my_function_argument, ). Zwróć uwagę na ,przecinek tutaj! W przeciwnym razie Python będzie narzekał na „brak argumentów pozycyjnych”. Zajęło mi 10 minut, żeby się domyślić. Sprawdź także użycie ręczne (w sekcji „klasa procesu”).
yuqli

2
@vartec jedną wadą korzystania ze słownika multipriocessing.Manager () jest to, że pikle (serializuje) zwracany obiekt, więc ma wąskie gardło określone przez bibliotekę piklowania o maksymalnym rozmiarze 2GiB, aby obiekt mógł zwrócić. Czy istnieje inny sposób na uniknięcie serializacji zwracanego obiektu?
hirschme

67

Myślę, że podejście sugerowane przez @sega_sai jest lepsze. Ale naprawdę potrzebuje przykładu kodu, więc oto:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

Które wydrukują zwracane wartości:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

Jeśli znasz map(wbudowany w Python 2), nie powinno to być zbyt trudne. W przeciwnym razie spójrz na link sega_Sai .

Zwróć uwagę, jak mało kodu jest potrzebne. (Zwróć również uwagę na to, jak procesy są ponownie wykorzystywane).


1
Wszelkie pomysły, dlaczego mój getpid()zwrot ma tę samą wartość? Używam Python3
zelusp,

Nie jestem pewien, jak Pool rozdziela zadania między pracowników. Może wszyscy mogą skończyć u tego samego pracownika, jeśli są naprawdę szybcy? Czy to się dzieje konsekwentnie? Również jeśli dodasz opóźnienie?
Mark

Pomyślałem również, że jest to kwestia związana z prędkością, ale kiedy karmię pool.mapzakres 1 000 000 za pomocą więcej niż 10 procesów, widzę co najwyżej dwie różne stawki.
zelusp

1
Więc nie jestem pewien. Myślę, że byłoby interesujące otworzyć osobne pytanie w tym zakresie.
Mark

Jeśli rzeczy, które chcesz wysłać inną funkcję do każdego procesu, użyj pool.apply_async: docs.python.org/3/library/…
Kyle

24

W tym przykładzie pokazano, jak korzystać z listy procesów wieloprocesowych. Instancje potoków, aby zwrócić ciągi z dowolnej liczby procesów:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

Wynik:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

Rozwiązanie to wykorzystuje mniej zasobów niż multiprocessing.Queue który używa

  • rura
  • co najmniej jeden zamek
  • bufor
  • wątek

lub multiprocessing.SimpleQueue, który używa

  • rura
  • co najmniej jeden zamek

Bardzo pouczające jest spojrzenie na źródło każdego z tych typów.


Jaki byłby najlepszy sposób, aby nie uczynić z rur globalnej zmiennej?
Nickpick,

Wszystkie globalne dane i kod umieściłem w głównej funkcji, która działa tak samo. Czy to jest odpowiedź na Twoje pytanie?
David Cullen,

czy potok musi zawsze zostać odczytany przed dodaniem (przesłaniem) nowej wartości?
Nickpick,

+1, dobra odpowiedź. Ale ponieważ rozwiązanie jest bardziej wydajne, kompromis polega na tym, że tworzysz jeden Pipena proces vs. jeden Queuedla wszystkich procesów. Nie wiem, czy to we wszystkich przypadkach będzie bardziej wydajne.
sudo

2
Ta odpowiedź powoduje zakleszczenie, jeśli zwracany obiekt jest duży. Zamiast zrobić proc.join () najpierw spróbowałbym recv () zwrócić wartość, a następnie łączyć.
L. Pes

21

Z jakiegoś powodu nie mogłem znaleźć ogólnego przykładu, jak to zrobić w Queuedowolnym miejscu (nawet przykłady dokumentów Pythona nie spawnują wielu procesów), więc oto, co udało mi się wykonać po 10 próbach:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queueto blokująca, bezpieczna dla wątków kolejka, której można użyć do przechowywania wartości zwracanych z procesów potomnych. Więc musisz przekazać kolejkę do każdego procesu. Coś mniej oczywiste jest to, że trzeba get()z kolejki przed wami joinprzez ProcessES lub innego kolejka zapełnia się i blokuje wszystko.

Aktualizacja dla osób zorientowanych obiektowo (testowane w Pythonie 3.4):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)

18

Dla każdego, kto szuka sposobu na uzyskanie wartości za Processpomocą Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    print queue.get()  # Prints {"foo": True}
    p.join()

1
kiedy umieszczam coś w kolejce w procesie roboczym, moje połączenie nigdy nie jest osiągane. Masz pomysł, jak to się stanie?
Laurens Koppenol

@LaurensKoppenol, czy masz na myśli to, że Twój główny kod wisi na p.join () na stałe i nigdy nie kontynuuje? Czy twój proces ma nieskończoną pętlę?
Matthew Moisen

4
Tak, wisi tam nieskończenie. Moi wszyscy robotnicy kończą się (pętla w ramach funkcji pracownika kończy się, następnie drukowana jest instrukcja print dla wszystkich pracowników). Łączenie nic nie robi. Jeśli Queuejoin()
usunę

@LaurensKoppenol Być może nie dzwonisz queue.put(ret)przed rozmową telefoniczną p.start()? W takim przypadku wątek roboczy zawiesi się na queue.get()zawsze. Możesz to zreplikować, kopiując mój fragment powyżej podczas komentowania queue.put(ret).
Matthew Moisen

Zredagowałem tę odpowiedź, queue.get()musi to nastąpić przed p.join(). To działa teraz dla mnie.
jfunk


10

Możesz użyć exitwbudowanego, aby ustawić kod zakończenia procesu. Można go uzyskać z exitcodeatrybutu procesu:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

Wynik:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

4
Ostrzegam, że takie podejście może być mylące. Procesy powinny na ogół kończyć się z kodem wyjścia 0, jeśli zostały zakończone bez błędu. Jeśli masz coś monitorującego systemowe kody wyjścia procesu, możesz zobaczyć te zgłoszone jako błędy.
żelazne koło

1
Idealne, jeśli chcesz zgłosić wyjątek w procesie nadrzędnym w przypadku błędu.
crizCraig


3

Pomyślałem, że uproszczę najprostsze przykłady skopiowane z góry, pracując dla mnie na Py3.6. Najprostszy to multiprocessing.Pool:

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

Możesz ustawić liczbę procesów w puli, używając np Pool(processes=5). Domyślnie jest to jednak liczba procesorów, więc pozostaw puste dla zadań związanych z procesorem. (Zadania związane z operacjami we / wy często i tak odpowiadają wątkom, ponieważ wątki w większości czekają, więc mogą współużytkować rdzeń procesora). PoolStosuje również optymalizację porcji .

(Należy zauważyć, że metody roboczej nie można zagnieździć w metodzie. Początkowo zdefiniowałem metodę roboczą w metodzie, która wywołuje wywołanie pool.map, aby zachować ją jako całość, ale potem procesy nie mogły jej zaimportować, i zgłosiło błąd „AttributeError” : Nie można marynować lokalnego obiektu outer_method..inner_method ". Więcej tutaj . Może być wewnątrz klasy.)

(Doceń 'represent!'raczej oryginalne pytanie zadane podczas drukowania time.sleep(), ale bez niego pomyślałem, że jakiś kod działałby równolegle, gdy nie był).


Py3 ProcessPoolExecutorma również dwie linie ( .mapzwraca generator, więc potrzebujesz list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

Z prostymi Processes:

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

Użyj, SimpleQueuejeśli wszystko czego potrzebujesz to puti get. Pierwsza pętla rozpoczyna wszystkie procesy, zanim druga wykona queue.getwywołania blokujące . Nie wydaje mi się, żeby dzwonić p.join().


2

Proste rozwiązanie:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

Wynik:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

2

Jeśli używasz języka Python 3, możesz użyć go concurrent.futures.ProcessPoolExecutorjako wygodnej abstrakcji:

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

Wynik:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

0

Zmodyfikowałem nieco odpowiedź vartec, ponieważ potrzebowałem uzyskać kody błędów z funkcji. (Dzięki vertec !!! to niesamowita sztuczka)

Można to również zrobić za pomocą, manager.listale myślę, że lepiej jest mieć go w dykcie i przechowywać w nim listę. W ten sposób zachowujemy funkcję i wyniki, ponieważ nie jesteśmy pewni kolejności, w jakiej lista zostanie wypełniona.

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.