Z powyższych komentarzy wynika, że jest to planowane od pandasjakiegoś czasu (jest też ciekawie wyglądający rosettaprojekt, który właśnie zauważyłem).
Jednak dopóki wszystkie funkcje równoległe nie zostaną włączone pandas, zauważyłem, że bardzo łatwo jest pisać wydajne i nie kopiujące pamięci równoległe rozszerzenia pandasbezpośrednio przy użyciu cython+ OpenMP i C ++.
Oto krótki przykład pisania równoległej sumy grupowej, której użycie wygląda mniej więcej tak:
import pandas as pd
import para_group_demo
df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)
a wynik to:
sum
key
0 6
1 11
2 4
Uwaga Bez wątpienia funkcjonalność tego prostego przykładu będzie ostatecznie częścią pandas. Jednak niektóre rzeczy będą bardziej naturalne do zrównoleglenia w C ++ przez jakiś czas i ważne jest, aby być świadomym, jak łatwo jest to połączyć pandas.
Aby to zrobić, napisałem proste rozszerzenie pliku o jednym źródle, którego kod następuje.
Zaczyna się od importu i definicji typów
from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map
cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange
import pandas as pd
ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t
C ++ unordered_map służy do sumowania przez pojedynczy wątek, a vectordo sumowania przez wszystkie wątki.
Teraz do funkcji sum. Zaczyna się od wpisanych widoków pamięci dla szybkiego dostępu:
def sum(crit, vals):
cdef int64_t[:] crit_view = crit.values
cdef int64_t[:] vals_view = vals.values
Funkcja kontynuuje, dzieląc pół-równo na wątki (tutaj zakodowane na stałe do 4), a każdy wątek sumuje wpisy w swoim zakresie:
cdef uint64_t num_threads = 4
cdef uint64_t l = len(crit)
cdef uint64_t s = l / num_threads + 1
cdef uint64_t i, j, e
cdef counts_vec_t counts
counts = counts_vec_t(num_threads)
counts.resize(num_threads)
with cython.boundscheck(False):
for i in prange(num_threads, nogil=True):
j = i * s
e = j + s
if e > l:
e = l
while j < e:
counts[i][crit_view[j]] += vals_view[j]
inc(j)
Po zakończeniu wątków funkcja scala wszystkie wyniki (z różnych zakresów) w jeden unordered_map:
cdef counts_t total
cdef counts_it_t it, e_it
for i in range(num_threads):
it = counts[i].begin()
e_it = counts[i].end()
while it != e_it:
total[deref(it).first] += deref(it).second
inc(it)
Pozostało tylko utworzyć DataFramei zwrócić wyniki:
key, sum_ = [], []
it = total.begin()
e_it = total.end()
while it != e_it:
key.append(deref(it).first)
sum_.append(deref(it).second)
inc(it)
df = pd.DataFrame({'key': key, 'sum': sum_})
df.set_index('key', inplace=True)
return df