Jak programować równolegle w Pythonie?


141

W przypadku C ++ możemy użyć OpenMP do programowania równoległego; jednak OpenMP nie będzie działać dla Pythona. Co powinienem zrobić, jeśli chcę równolegle z niektórymi częściami mojego programu w języku Python?

Strukturę kodu można uznać za:

solve1(A)
solve2(B)

Gdzie solve1i solve2są dwie niezależne funkcje. Jak uruchomić ten rodzaj kodu równolegle zamiast po kolei, aby skrócić czas wykonywania? Mam nadzieję, że ktoś może mi pomóc. Z góry bardzo dziękuję. Kod to:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break

        node1 = partition[0]
        node2 = partition[1]

        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

Gdzie setinner i setouter to dwie niezależne funkcje. Właśnie tam chcę równolegle ...


31
Przyjrzyj się wieloprocesorowości . Uwaga: Wątki Pythona nie nadają się do zadań związanych z procesorem, tylko do zadań związanych z we / wy.
9000

4
@ 9000 +100 internetów do wymieniania zadań zależnych od procesora i we / wy.
Hyperboreus,

@ 9000 Właściwie wątki w ogóle nie nadają się do zadań związanych z procesorem, o ile wiem! Procesy to właściwy sposób wykonywania rzeczywistych zadań związanych z procesorem.
Omar Al-Ithawi

6
@OmarIthawi: dlaczego, wątki działają dobrze, jeśli masz wiele rdzeni procesora (jak zwykle teraz). Następnie proces może uruchomić kilka wątków, ładując wszystkie te rdzenie równolegle i niejawnie udostępniając wspólne dane między nimi (to znaczy bez jawnego obszaru pamięci współużytkowanej lub przesyłania komunikatów między procesami).
9000

1
@ user2134774: Cóż, tak, mój drugi komentarz nie ma sensu. Prawdopodobnie jedyne rozszerzenia C, które wypuszczają GIL, mogą na tym skorzystać; np. części NumPy i Pandy to robią. W innych przypadkach jest źle (ale nie mogę go teraz edytować).
9000

Odpowiedzi:


162

Możesz użyć modułu wieloprocesorowego . W tym przypadku mogę użyć puli przetwarzania:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

Spowoduje to powstanie procesów, które mogą wykonywać za Ciebie ogólną pracę. Ponieważ nie przeszliśmy processes, spowoduje to powstanie jednego procesu dla każdego rdzenia procesora na twojej maszynie. Każdy rdzeń procesora może jednocześnie wykonywać jeden proces.

Jeśli chcesz zmapować listę do pojedynczej funkcji, wykonaj następujące czynności:

args = [A, B]
results = pool.map(solve1, args)

Nie używaj wątków, ponieważ GIL blokuje wszelkie operacje na obiektach Pythona.


1
nie pool.mapakceptuje również słowniki jak args? Czy tylko proste listy?
Bndr

Myślę, że tylko listy. Ale możesz po prostu przekazać dict.items (), który będzie listą krotek kluczowych wartości
Matt Williamson,

Niestety kończy się to błędem typu `` unhashable type: 'list' '
Bndr

oprócz mojego ostatniego komentarza: `dict.items ()` work. Błąd pojawia się, ponieważ musiałem zmienić obsługę zmiennej wgląd w funkcję procesu. Niestety komunikat o błędzie nie był zbyt pomocny ... Więc: dziękuję za wskazówkę. :-)
Bndr

2
Co to jest limit czasu?
gamma

26

Dzięki Rayowi można to zrobić bardzo elegancko .

Aby zrównoleglać swój przykład, musisz zdefiniować swoje funkcje za pomocą @ray.remotedekoratora, a następnie wywołać je za pomocą .remote.

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

W porównaniu z modułem wieloprocesowym ma to wiele zalet .

  1. Ten sam kod będzie działał na maszynie wielordzeniowej, a także na klastrze maszyn.
  2. Procesy efektywnie współużytkują dane za pośrednictwem pamięci współużytkowanej i serializacji bez kopii .
  3. Komunikaty o błędach są ładnie propagowane.
  4. Te wywołania funkcji można składać razem, np.

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
  5. Oprócz zdalnego wywoływania funkcji, klasy można tworzyć zdalnie jako aktorzy .

Zauważ, że Ray to framework, który pomagałem rozwijać.


Ciągle otrzymuję komunikat o błędzie "Nie można znaleźć wersji spełniającej wymagania ray (z wersji
:)

2
Zwykle ten rodzaj błędu oznacza, że ​​musisz dokonać aktualizacji pip. Proponuję spróbować pip install --upgrade pip. Jeśli sudow ogóle musisz używać , możliwe, że wersja pip, której używasz do instalacji, raynie jest tą samą, która jest aktualizowana. Możesz to sprawdzić pip --version. Ponadto system Windows nie jest obecnie obsługiwany, więc jeśli używasz systemu Windows, prawdopodobnie jest to problem.
Robert Nishihara,

1
Tylko uwaga, dotyczy to głównie dystrybucji współbieżnych zadań na wielu komputerach.
Matt Williamson,

2
W rzeczywistości jest zoptymalizowany zarówno pod kątem przypadku pojedynczego komputera, jak i ustawienia klastra. Wiele decyzji projektowych (np. Pamięć współdzielona, ​​serializacja bez kopii) jest ukierunkowanych na dobrą obsługę pojedynczych maszyn.
Robert Nishihara

2
Byłoby wspaniale, gdyby doktorzy zwrócili na to uwagę. Czytając dokumentację, wyczułem, że tak naprawdę nie był przeznaczony dla przypadku jednej maszyny.
Sledge


4

Rozwiązaniem, jak powiedzieli inni, jest użycie wielu procesów. To, która struktura jest bardziej odpowiednia, zależy jednak od wielu czynników. Oprócz tych, o których już wspomniałem, jest też charm4py i mpi4py (jestem twórcą charm4py).

Istnieje bardziej wydajny sposób implementacji powyższego przykładu niż użycie abstrakcji puli procesów roboczych. Pętla główna wysyła te same parametry (w tym pełny wykres G) do pracowników w każdej z 1000 iteracji. Ponieważ co najmniej jeden pracownik będzie rezydował w innym procesie, wiąże się to z kopiowaniem i wysyłaniem argumentów do innych procesów. Może to być bardzo kosztowne w zależności od wielkości obiektów. Zamiast tego sensowne jest, aby pracownicy zapisywali stan i po prostu wysyłali zaktualizowane informacje.

Na przykład w charm4py można to zrobić w następujący sposób:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

Zauważ, że w tym przykładzie naprawdę potrzebujemy tylko jednego pracownika. Główna pętla mogłaby wykonać jedną z funkcji, a drugą kazać pracownikowi. Ale mój kod pomaga zilustrować kilka rzeczy:

  1. Pracownik A działa w procesie 0 (tak samo jak główna pętla). Choć result_a.get()jest zablokowany czekając na wynik, pracownikowi robi obliczeń w tym samym procesie.
  2. Argumenty są automatycznie przekazywane przez odwołanie do pracownika A, ponieważ znajduje się on w tym samym procesie (nie ma potrzeby kopiowania).

2

W niektórych przypadkach możliwe jest automatyczne zrównoleglenie pętli za pomocą Numba , chociaż działa to tylko z niewielkim podzbiorem Pythona:

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

Niestety wydaje się, że Numba działa tylko z tablicami Numpy, ale nie z innymi obiektami Pythona. Teoretycznie może być również możliwe skompilowanie Pythona do C ++, a następnie automatyczne zrównoleglenie go za pomocą kompilatora Intel C ++ , chociaż jeszcze tego nie próbowałem.


2

joblibBiblioteki można używać do wykonywania obliczeń równoległych i przetwarzania wieloprocesowego.

from joblib import Parallel, delayed

Możesz po prostu utworzyć funkcję, fooktórą chcesz uruchamiać równolegle i na podstawie następującego fragmentu kodu implementować przetwarzanie równoległe:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

Gdzie num_coresmożna uzyskać z multiprocessingbiblioteki w następujący sposób:

import multiprocessing

num_cores = multiprocessing.cpu_count()

Jeśli masz funkcję z więcej niż jednym argumentem wejściowym i chcesz po prostu wykonać iterację po jednym z argumentów za pomocą listy, możesz użyć partialfunkcji z functoolsbiblioteki w następujący sposób:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

Możesz znaleźć pełne wyjaśnienie wieloprocesorowości w Pythonie i R z kilkoma przykładami tutaj .

Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.