To, czego szukasz, to wzorzec Producent / Konsument
Podstawowy przykład gwintowania
Oto podstawowy przykład użycia modułu wątkowego (zamiast wieloprocesorowego)
import threading
import Queue
import sys
def do_work(in_queue, out_queue):
while True:
item = in_queue.get()
result = item
out_queue.put(result)
in_queue.task_done()
if __name__ == "__main__":
work = Queue.Queue()
results = Queue.Queue()
total = 20
for i in xrange(4):
t = threading.Thread(target=do_work, args=(work, results))
t.daemon = True
t.start()
for i in xrange(total):
work.put(i)
work.join()
for i in xrange(total):
print results.get()
sys.exit()
Nie udostępniasz obiektu pliku wątkom. Pracowałbyś dla nich, dostarczając do kolejki wiersze danych. Następnie każdy wątek pobierałby wiersz, przetwarzał go, a następnie zwracał do kolejki.
W module wieloprocesorowym wbudowano kilka bardziej zaawansowanych funkcji udostępniania danych, takich jak listy i specjalny rodzaj kolejki . Istnieją pewne kompromisy związane z używaniem przetwarzania wieloprocesowego w porównaniu z wątkami i zależy to od tego, czy Twoja praca jest związana z procesorem czy we / wy.
Przykład podstawowego przetwarzania wieloprocesowego
Oto naprawdę podstawowy przykład puli wieloprocesowej
from multiprocessing import Pool
def process_line(line):
return "FOO: %s" % line
if __name__ == "__main__":
pool = Pool(4)
with open('file.txt') as source_file:
results = pool.map(process_line, source_file, 4)
print results
Pula to wygodny obiekt zarządzający własnymi procesami. Ponieważ otwarty plik może iterować po swoich wierszach, możesz przekazać go do programu pool.map()
, który zapętli go i dostarczy wiersze do funkcji roboczej. Map blokuje i zwraca cały wynik po zakończeniu. Należy pamiętać, że jest to zbyt uproszczony przykład i że pool.map()
przed wykonaniem pracy odczyta cały plik do pamięci na raz. Jeśli spodziewasz się dużych plików, pamiętaj o tym. Istnieją bardziej zaawansowane sposoby projektowania konfiguracji producenta / konsumenta.
Ręczna „pula” z limitem i ponownym sortowaniem linii
To jest ręczny przykład mapy Pool.map , ale zamiast zużywać całą iterację za jednym razem, możesz ustawić rozmiar kolejki, aby podawać ją tylko kawałek po kawałku tak szybko, jak to możliwe. Dodałem również numery linii, abyś mógł je śledzić i odnosić się do nich, jeśli chcesz, później.
from multiprocessing import Process, Manager
import time
import itertools
def do_work(in_queue, out_list):
while True:
item = in_queue.get()
line_no, line = item
if line == None:
return
time.sleep(.5)
result = (line_no, line)
out_list.append(result)
if __name__ == "__main__":
num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work, results))
p.start()
pool.append(p)
with open("source.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)
for p in pool:
p.join()
print sorted(results)