Jest bardziej powszechna wersja tego pytania dotyczące parallelization na pandy mają zastosowanie funkcji - tak to jest orzeźwiający pytanie :)
Po pierwsze , chciałbym wspomnieć o szybszym, ponieważ poprosiłeś o rozwiązanie „spakowane”, i pojawia się ono na większości pytań SO dotyczących równoległości pand.
Ale .. nadal chciałbym udostępnić dla niego mój osobisty kod źródłowy, ponieważ po kilku latach pracy z DataFrame nigdy nie znalazłem rozwiązania w 100% zrównoleglonego (głównie dla funkcji Apply) i zawsze musiałem wracać po mój „ ręczny ”.
Dzięki tobie sprawiłem, że bardziej ogólna jest obsługa dowolnej (teoretycznie) metody DataFrame według jej nazwy (abyś nie musiał zachowywać wersji dla isin, aplikować itp.).
Przetestowałem to na funkcjach „isin”, „Apply” i „isna”, używając zarówno Pythona 2.7, jak i 3.6. Ma mniej niż 20 linii i postępowałem zgodnie z konwencją nazewnictwa pand, taką jak „podzbiór” i „njobs”.
Dodałem również porównanie czasu z równoważnym kodem dask dla „isin” i wydaje się ~ X2 razy wolniejsze niż ta treść.
Zawiera 2 funkcje:
df_multi_core - to ten, do którego dzwonisz. Akceptuje:
- Twój obiekt df
- Nazwa funkcji, którą chcesz wywołać
- Podzbiór kolumn, na których można wykonać funkcję (pomaga skrócić czas / pamięć)
- Liczba zadań do uruchomienia równoległego (-1 lub pominięcie dla wszystkich rdzeni)
- Wszelkie inne kwargsy, które akceptuje funkcja df (np. „Oś”)
_df_split - jest to wewnętrzna funkcja pomocnicza, która musi być globalnie umieszczona w uruchomionym module (Pool.map jest „zależna od położenia”), w przeciwnym razie zlokalizowałbym ją wewnętrznie.
Oto kod z mojego GIST (dodam Więcej Pandy testy funkcyjne tam):
import pandas as pd
import numpy as np
import multiprocessing
from functools import partial
def _df_split(tup_arg, **kwargs):
split_ind, df_split, df_f_name = tup_arg
return (split_ind, getattr(df_split, df_f_name)(**kwargs))
def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
if njobs == -1:
njobs = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=njobs)
try:
splits = np.array_split(df[subset], njobs)
except ValueError:
splits = np.array_split(df, njobs)
pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
results = pool.map(partial(_df_split, **kwargs), pool_data)
pool.close()
pool.join()
results = sorted(results, key=lambda x:x[0])
results = pd.concat([split[1] for split in results])
return results
Poniżej znajduje się kod testowy dla równoległego isin , porównujący natywną, wielordzeniową wydajność gist i dask. Na maszynie I7 z 8 rdzeniami fizycznymi miałem około X4 razy przyspieszenie. Bardzo chciałbym usłyszeć, co masz na swoich prawdziwych danych!
from time import time
if __name__ == '__main__':
sep = '-' * 50
# isin test
N = 10000000
df = pd.DataFrame({'c1': np.random.randint(low=1, high=N, size=N), 'c2': np.arange(N)})
lookfor = np.random.randint(low=1, high=N, size=1000000)
print('{}\ntesting pandas isin on {}\n{}'.format(sep, df.shape, sep))
t1 = time()
print('result\n{}'.format(df.isin(lookfor).sum()))
t2 = time()
print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))
t3 = time()
res = df_multi_core(df=df, df_f_name='isin', subset=['c1'], njobs=-1, values=lookfor)
print('result\n{}'.format(res.sum()))
t4 = time()
print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))
t5 = time()
ddata = dd.from_pandas(df, npartitions=njobs)
res = ddata.map_partitions(lambda df: df.apply(apply_f, axis=1)).compute(scheduler='processes')
t6 = time()
print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
print('time for dask implementation {}\n{}'.format(round(t6 - t5, 2), sep))
--------------------------------------------------
testing pandas isin on (10000000, 2)
--------------------------------------------------
result
c1 953213
c2 951942
dtype: int64
time for native implementation 3.87
--------------------------------------------------
result
c1 953213
dtype: int64
time for multi core implementation 1.16
--------------------------------------------------
result
c1 953213
c2 951942
dtype: int64
time for dask implementation 2.88