Przetwarzanie wieloprocesowe: użyj tqdm, aby wyświetlić pasek postępu


103

Aby mój kod był bardziej „pythonowy” i szybszy, używam funkcji „multiprocessing” i funkcji map do wysyłania a) funkcji i b) zakresu iteracji.

Wszczepione rozwiązanie (tj. Wywołanie tqdm bezpośrednio z zakresu tqdm.tqdm (zakres (0, 30)) nie działa z przetwarzaniem wieloprocesowym (zgodnie z poniższym kodem).

Pasek postępu jest wyświetlany od 0 do 100% (kiedy Python czyta kod?), Ale nie wskazuje faktycznego postępu funkcji mapy.

Jak wyświetlić pasek postępu, który wskazuje, na którym etapie znajduje się funkcja „mapa”?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

Każda pomoc lub sugestie są mile widziane ...


Czy możesz opublikować fragment kodu paska postępu?
Alex

2
Dla osób poszukujących rozwiązania z .starmap(): Oto łatka do Pooldodawania .istarmap(), która również będzie działać tqdm.
Darkonaut

Odpowiedzi:


136

Użyj imap zamiast map, co zwraca iterator przetwarzanych wartości.

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))

14
Załączająca instrukcja list () czeka na zakończenie iteratora. total = jest również wymagane, ponieważ tqdm nie wie, jak długo będzie trwała iteracja,
hkyi

16
Czy jest podobne rozwiązanie dla starmap()?
tarashypka

2
for i in tqdm.tqdm(...): pass może być prostsze niżlist(tqdm.tqdm)
savfod

1
To działa, ale czy ktoś inny miał stale drukować pasek postępu w nowej linii dla każdej iteracji?
Dennis Subachev

3
Zachowanie jest powiązane, gdy jest specyficzne chunk_sizedla p.imap. Czy można tqdmaktualizować każdą iterację zamiast każdego fragmentu?
huangbiubiu

56

Znalezione rozwiązanie: bądź ostrożny! Ze względu na przetwarzanie wieloprocesowe czas oszacowania (iteracja na pętlę, czas całkowity itp.) Może być niestabilny, ale pasek postępu działa idealnie.

Uwaga: Menedżer kontekstu dla puli jest dostępny tylko od wersji Python 3.3

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):
                pbar.update()

2
pbar.close()nie jest wymagane, zostanie zamknięte automatycznie po zakończeniuwith
Sagar Kar

5
Czy tqdmpotrzebne jest tutaj drugie / wewnętrzne wezwanie?
shadowtalker,

7
A co z wynikiem _foo (mój_numer), który jest zwracany jako „r”, o którym mowa?
Likak

4
Czy jest podobne rozwiązanie dla starmap()?
tarashypka

2
@shadowtalker - wydaje się działać bez;). W każdym razie - imap_unorderedjest tutaj kluczowy, daje najlepszą wydajność i najlepsze oceny paska postępu.
Tomasz Gandor

21

Możesz użyć p_tqdmzamiast tego.

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))

1
Działa to bardzo dobrze i było to bardzo łatwe pip install. To zastępuje tqdm dla większości moich potrzeb
crypdick

Merci Victor;)
Gabriel Romon

p_tqdmjest ograniczone do multiprocessing.Pool, niedostępne dla wątków
pateheo

21

Przepraszamy za spóźnienie, ale jeśli potrzebujesz tylko współbieżnej mapy, najnowsza wersja ( tqdm>=4.42.0) ma teraz wbudowane:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

Źródła: https://tqdm.github.io/docs/contrib.concurrent/ i https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py


1
Dzięki za to. Działa łatwo, znacznie lepiej niż jakiekolwiek inne rozwiązanie, które wypróbowałem.
user3340499

Fajnie (+1), ale rzuca HBox(children=(FloatProgress(value=0.0, max=30.0), HTML(value='')))Jupyter
Ébe Isaac


Widzę problem z dyskusją o włamaniu do tqdm_notebook, jednak nie mogę znaleźć rozwiązania do rozwiązania dla tqdm.contrib.concurrent.
Ébe Isaac

8

na podstawie odpowiedzi Xavi Martíneza napisałem funkcję imap_unordered_bar. Można go używać w taki sam sposób, jak imap_unorderedz tą różnicą, że wyświetlany jest pasek przetwarzania.

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))

3
Spowoduje to ponowne narysowanie paska na każdym kroku w nowej linii. Jak zaktualizować tę samą linię?
misantroop

Rozwiązanie w moim przypadku (Windows / Powershell): Colorama.
misantroop

'pbar.close () nie jest wymagane, zostanie zamknięte automatycznie po zakończeniu z' jak komentarz Sagara w odpowiedzi @ scipy
Tejas Shetty

1

Oto moja opinia na temat sytuacji, w których musisz uzyskać wyniki z równoległych funkcji wykonujących. Ta funkcja robi kilka rzeczy (jest jeszcze jeden mój post, który wyjaśnia to dokładniej), ale kluczową kwestią jest to, że istnieje kolejka oczekujących zadań i kolejka zadań zakończonych. Gdy pracownicy wykonują każde zadanie w kolejce oczekującej, dodają wyniki do kolejki zadań zakończonych. Możesz zawinąć czek do kolejki ukończonych zadań za pomocą paska postępu tqdm. Nie umieszczam tutaj implementacji funkcji do_work (), nie ma to znaczenia, ponieważ przesłanie tutaj ma na celu monitorowanie kolejki wykonanych zadań i aktualizowanie paska postępu za każdym razem, gdy jest wynik.

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results

0
import multiprocessing as mp
import tqdm


some_iterable = ...

def some_func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(mp.cpu_count()-2) as p:
        list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))

-2

Takie podejście jest proste i działa.

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():
    time.sleep(1)
    pbar.update()

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):
        pool.apply_async(job)
    pool.close()
    pool.join()
Korzystając z naszej strony potwierdzasz, że przeczytałeś(-aś) i rozumiesz nasze zasady używania plików cookie i zasady ochrony prywatności.
Licensed under cc by-sa 3.0 with attribution required.