Przyjrzałem się wielu odpowiedziom na temat przepełnienia stosu i Internetu, próbując skonfigurować sposób wykonywania przetwarzania wieloprocesowego przy użyciu kolejek do przekazywania dużych ramek danych pand. Wydawało mi się, że każda odpowiedź była powtórzeniem tego samego rodzaju rozwiązań bez uwzględnienia mnogości skrajnych przypadków, na które z pewnością można się natknąć podczas wykonywania takich obliczeń. Problem w tym, że w tym samym czasie dzieje się wiele rzeczy. Liczba zadań, liczba pracowników, czas trwania każdego zadania i możliwe wyjątki podczas wykonywania zadania. Wszystko to utrudnia synchronizację, a większość odpowiedzi nie dotyczy tego, jak możesz się do tego zabrać. To jest moje spojrzenie po kilkugodzinnych zabawach, mam nadzieję, że będzie to wystarczająco ogólne, aby większość ludzi uznało to za przydatne.
Kilka przemyśleń przed przykładami kodowania. Ponieważ queue.Empty
lub queue.qsize()
jakakolwiek inna podobna metoda jest zawodna w kontroli przepływu, każdy podobny kod
while True:
try:
task = pending_queue.get_nowait()
except queue.Empty:
break
jest fałszywy. To zabije pracownika, nawet jeśli kilka milisekund później w kolejce pojawi się kolejne zadanie. Pracownik nie wyzdrowieje i po chwili WSZYSCY pracownicy znikną, ponieważ przypadkowo znajdą kolejkę chwilowo pustą. W rezultacie główna funkcja wieloprocesorowa (ta z funkcją join () w procesach) powróci bez ukończenia wszystkich zadań. Miły. Powodzenia w debugowaniu, jeśli masz tysiące zadań, a kilku brakuje.
Drugą kwestią jest użycie wartości wartowniczych. Wiele osób zasugerowało dodanie wartości wartowniczej do kolejki, aby oznaczyć koniec kolejki. Ale aby oznaczyć to komu dokładnie? Jeśli jest N pracowników, zakładając, że N to liczba dostępnych rdzeni, które można oddać lub wziąć, wtedy pojedyncza wartość wartownika będzie oznaczać koniec kolejki tylko do jednego pracownika. Wszyscy pozostali pracownicy będą siedzieć, czekając na więcej pracy, gdy jej już nie ma. Typowe przykłady, które widziałem, to
while True:
task = pending_queue.get()
if task == SOME_SENTINEL_VALUE:
break
Jeden pracownik otrzyma wartość wartowniczą, podczas gdy reszta będzie czekać w nieskończoność. Żaden post, na który natknąłem się, nie wspomniał, że musisz przesłać wartość wartowniczą do kolejki CO NAJMNIEJ tyle razy, ile masz pracowników, aby WSZYSCY je otrzymali.
Drugą kwestią jest obsługa wyjątków podczas wykonywania zadań. Ponownie należy je złapać i zarządzać. Co więcej, jeśli masz completed_tasks
kolejkę, powinieneś niezależnie obliczyć w sposób deterministyczny, ile elementów jest w kolejce, zanim zdecydujesz, że zadanie zostało wykonane. Ponownie poleganie na rozmiarach kolejek jest skazane na niepowodzenie i zwraca nieoczekiwane wyniki.
W poniższym przykładzie par_proc()
funkcja otrzyma listę zadań zawierającą funkcje, z którymi te zadania powinny być wykonywane, wraz z nazwanymi argumentami i wartościami.
import multiprocessing as mp
import dill as pickle
import queue
import time
import psutil
SENTINEL = None
def do_work(tasks_pending, tasks_completed):
worker_name = mp.current_process().name
while True:
try:
task = tasks_pending.get_nowait()
except queue.Empty:
print(worker_name + ' found an empty queue. Sleeping for a while before checking again...')
time.sleep(0.01)
else:
try:
if task == SENTINEL:
print(worker_name + ' no more work left to be done. Exiting...')
break
print(worker_name + ' received some work... ')
time_start = time.perf_counter()
work_func = pickle.loads(task['func'])
result = work_func(**task['task'])
tasks_completed.put({work_func.__name__: result})
time_end = time.perf_counter() - time_start
print(worker_name + ' done in {} seconds'.format(round(time_end, 5)))
except Exception as e:
print(worker_name + ' task failed. ' + str(e))
tasks_completed.put({work_func.__name__: None})
def par_proc(job_list, num_cpus=None):
if not num_cpus:
num_cpus = psutil.cpu_count(logical=False)
print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()
processes = []
results = []
num_tasks = 0
for job in job_list:
for task in job['tasks']:
expanded_job = {}
num_tasks = num_tasks + 1
expanded_job.update({'func': pickle.dumps(job['func'])})
expanded_job.update({'task': task})
tasks_pending.put(expanded_job)
num_workers = num_cpus
for c in range(num_workers):
tasks_pending.put(SENTINEL)
print('* Number of tasks: {}'.format(num_tasks))
for c in range(num_workers):
p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed))
p.name = 'worker' + str(c)
processes.append(p)
p.start()
completed_tasks_counter = 0
while completed_tasks_counter < num_tasks:
results.append(tasks_completed.get())
completed_tasks_counter = completed_tasks_counter + 1
for p in processes:
p.join()
return results
A oto test do uruchomienia powyższego kodu
def test_parallel_processing():
def heavy_duty1(arg1, arg2, arg3):
return arg1 + arg2 + arg3
def heavy_duty2(arg1, arg2, arg3):
return arg1 * arg2 * arg3
task_list = [
{'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
{'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
]
results = par_proc(task_list)
job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])
assert job1 == 15
assert job2 == 21
plus jeszcze jeden z kilkoma wyjątkami
def test_parallel_processing_exceptions():
def heavy_duty1_raises(arg1, arg2, arg3):
raise ValueError('Exception raised')
return arg1 + arg2 + arg3
def heavy_duty2(arg1, arg2, arg3):
return arg1 * arg2 * arg3
task_list = [
{'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
{'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
]
results = par_proc(task_list)
job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])
assert not job1
assert job2 == 21
Mam nadzieję, że to jest pomocne.