Odpowiedzi:
W przypadku krótkiego, szybkiego rozwiązania, które robi wszystko w jednej pętli, bez zależności, poniższy kod działa świetnie.
mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
cumsum, moving_aves = [0], []
for i, x in enumerate(mylist, 1):
cumsum.append(cumsum[i-1] + x)
if i>=N:
moving_ave = (cumsum[i] - cumsum[i-N])/N
#can do stuff with moving_ave here
moving_aves.append(moving_ave)
UPD: Alleo i jasaarim zaproponowały bardziej wydajne rozwiązania .
Możesz użyć np.convolve
do tego:
np.convolve(x, np.ones((N,))/N, mode='valid')
Średnia bieżąca to przypadek działania matematycznego splotu . Dla średniej biegnącej przesuwasz okno wzdłuż danych wejściowych i obliczasz średnią zawartość okna. W przypadku dyskretnych sygnałów 1D splot jest tym samym, z tą różnicą, że zamiast średniej oblicza się dowolną kombinację liniową, tj. Pomnożyć każdy element przez odpowiedni współczynnik i zsumować wyniki. Współczynniki te, po jednym dla każdej pozycji w oknie, są czasami nazywane jądrem splotowym . Średnia arytmetyczna wartości N jest (x_1 + x_2 + ... + x_N) / N
więc odpowiednia jądro (1/N, 1/N, ..., 1/N)
i właśnie to otrzymujemy np.ones((N,))/N
.
mode
Argument np.convolve
określa sposób obsługiwać brzegi. Wybrałem valid
tutaj tryb, ponieważ myślę, że tak większość ludzi oczekuje, że bieganie będzie działać, ale możesz mieć inne priorytety. Oto wykres ilustrujący różnicę między trybami:
import numpy as np
import matplotlib.pyplot as plt
modes = ['full', 'same', 'valid']
for m in modes:
plt.plot(np.convolve(np.ones((200,)), np.ones((50,))/50, mode=m));
plt.axis([-10, 251, -.1, 1.1]);
plt.legend(modes, loc='lower center');
plt.show()
numpy.cumsum
ma większą złożoność.
Konwolucja jest znacznie lepsza niż proste podejście, ale (jak sądzę) używa FFT, a zatem dość powolna. Jednak specjalnie do obliczania biegu oznacza, że następujące podejście działa dobrze
def running_mean(x, N):
cumsum = numpy.cumsum(numpy.insert(x, 0, 0))
return (cumsum[N:] - cumsum[:-N]) / float(N)
Kod do sprawdzenia
In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop
Zauważ, że numpy.allclose(result1, result2)
to True
dwie metody są równoważne. Im większy N, tym większa różnica w czasie.
# demonstrate loss of precision with only 100,000 points
np.random.seed(42)
x = np.random.randn(100000)+1e6
y1 = running_mean_convolve(x, 10)
y2 = running_mean_cumsum(x, 10)
assert np.allclose(y1, y2, rtol=1e-12, atol=0)
np.longdouble
ale błąd zmiennoprzecinkowy nadal będzie znaczący dla stosunkowo dużej liczby punktów (około> 1e5, ale zależy od twoich danych)numpy.convolve
O (mn); jego dokumenty wspominają, że scipy.signal.fftconvolve
używa FFT.
running_mean([1,2,3], 2)
daje array([1, 2])
. Zastąpienie x
przez [float(value) for value in x]
robi lewę.
x
zawiera zmiennoprzecinkowe. Przykład: running_mean(np.arange(int(1e7))[::-1] + 0.2, 1)[-1] - 0.2
zwraca, 0.003125
gdy się tego oczekuje 0.0
. Więcej informacji: en.wikipedia.org/wiki/Loss_of_signiance
Aktualizacja: Poniższy przykład pokazuje starą pandas.rolling_mean
funkcję, która została usunięta w najnowszych wersjach pand. Byłby to współczesny odpowiednik poniższego wywołania funkcji
In [8]: pd.Series(x).rolling(window=N).mean().iloc[N-1:].values
Out[8]:
array([ 0.49815397, 0.49844183, 0.49840518, ..., 0.49488191,
0.49456679, 0.49427121])
pandy są bardziej odpowiednie do tego niż NumPy lub SciPy. Jego funkcja rolling_mean wykonuje tę pracę wygodnie. Zwraca również tablicę NumPy, gdy dane wejściowe są tablicą.
rolling_mean
Wydajność jest trudna do pokonania dzięki dowolnej niestandardowej implementacji czystego języka Python. Oto przykładowa wydajność w stosunku do dwóch proponowanych rozwiązań:
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: def running_mean(x, N):
...: cumsum = np.cumsum(np.insert(x, 0, 0))
...: return (cumsum[N:] - cumsum[:-N]) / N
...:
In [4]: x = np.random.random(100000)
In [5]: N = 1000
In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid')
10 loops, best of 3: 172 ms per loop
In [7]: %timeit running_mean(x, N)
100 loops, best of 3: 6.72 ms per loop
In [8]: %timeit pd.rolling_mean(x, N)[N-1:]
100 loops, best of 3: 4.74 ms per loop
In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))
Out[9]: True
Istnieją również dobre opcje radzenia sobie z wartościami krawędzi.
df.rolling(windowsize).mean()
teraz działa zamiast tego (bardzo szybko mogę dodać). dla 6000 serii wierszy %timeit test1.rolling(20).mean()
zwróciło 1000 pętli, najlepiej 3: 1,16 ms na pętlę
df.rolling()
działa wystarczająco dobrze, problem polega na tym, że nawet ta forma nie będzie obsługiwać ndarrays w przyszłości. Aby go użyć, musimy najpierw załadować nasze dane do ramki danych Pandas. Chciałbym zobaczyć tę funkcję dodaną do jednego numpy
lub scipy.signal
.
%timeit bottleneck.move_mean(x, N)
jest 3 do 15 razy szybszy niż metody sumy i pand na moim komputerze. Zobacz ich punkt odniesienia w README repozytorium .
Możesz obliczyć średnią bieżącą za pomocą:
import numpy as np
def runningMean(x, N):
y = np.zeros((len(x),))
for ctr in range(len(x)):
y[ctr] = np.sum(x[ctr:(ctr+N)])
return y/N
Ale jest wolny.
Na szczęście numpy zawiera funkcję splotu, której możemy użyć do przyspieszenia. Średnia biegowa jest równoważna zwojowi x
z wektorem, który jest N
długi, a wszystkie elementy są równe 1/N
. Implementacja numpy splotu obejmuje początkowy stan przejściowy, więc musisz usunąć pierwsze punkty N-1:
def runningMeanFast(x, N):
return np.convolve(x, np.ones((N,))/N)[(N-1):]
Na mojej maszynie szybka wersja jest 20-30 razy szybsza, w zależności od długości wektora wejściowego i wielkości okna uśredniania.
Zauważ, że tryb zwojowy zawiera 'same'
tryb, który wydaje się, że powinien rozwiązać początkowy problem przejściowy, ale dzieli go na początek i koniec.
mode='valid'
w convolve
których nie wymaga żadnego post-processing.
mode='valid'
usuwa stan przejściowy z obu końców, prawda? Jeśli len(x)=10
i N=4
, dla średniej bieżącej, chciałbym 10 wyników, ale valid
zwraca 7.
modes = ('full', 'same', 'valid'); [plot(convolve(ones((200,)), ones((50,))/50, mode=m)) for m in modes]; axis([-10, 251, -.1, 1.1]); legend(modes, loc='lower center')
(z importem pyplot i numpy).
runningMean
Mam efekt uboczny uśredniania zerami, gdy wychodzisz z tablicy x[ctr:(ctr+N)]
dla prawej strony tablicy.
runningMeanFast
mają również ten problem z efektem granicznym.
lub moduł dla Pythona, który oblicza
w moich testach na Tradewave.net TA-lib zawsze wygrywa:
import talib as ta
import numpy as np
import pandas as pd
import scipy
from scipy import signal
import time as t
PAIR = info.primary_pair
PERIOD = 30
def initialize():
storage.reset()
storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0])
def cumsum_sma(array, period):
ret = np.cumsum(array, dtype=float)
ret[period:] = ret[period:] - ret[:-period]
return ret[period - 1:] / period
def pandas_sma(array, period):
return pd.rolling_mean(array, period)
def api_sma(array, period):
# this method is native to Tradewave and does NOT return an array
return (data[PAIR].ma(PERIOD))
def talib_sma(array, period):
return ta.MA(array, period)
def convolve_sma(array, period):
return np.convolve(array, np.ones((period,))/period, mode='valid')
def fftconvolve_sma(array, period):
return scipy.signal.fftconvolve(
array, np.ones((period,))/period, mode='valid')
def tick():
close = data[PAIR].warmup_period('close')
t1 = t.time()
sma_api = api_sma(close, PERIOD)
t2 = t.time()
sma_cumsum = cumsum_sma(close, PERIOD)
t3 = t.time()
sma_pandas = pandas_sma(close, PERIOD)
t4 = t.time()
sma_talib = talib_sma(close, PERIOD)
t5 = t.time()
sma_convolve = convolve_sma(close, PERIOD)
t6 = t.time()
sma_fftconvolve = fftconvolve_sma(close, PERIOD)
t7 = t.time()
storage.elapsed[-1] = storage.elapsed[-1] + t2-t1
storage.elapsed[-2] = storage.elapsed[-2] + t3-t2
storage.elapsed[-3] = storage.elapsed[-3] + t4-t3
storage.elapsed[-4] = storage.elapsed[-4] + t5-t4
storage.elapsed[-5] = storage.elapsed[-5] + t6-t5
storage.elapsed[-6] = storage.elapsed[-6] + t7-t6
plot('sma_api', sma_api)
plot('sma_cumsum', sma_cumsum[-5])
plot('sma_pandas', sma_pandas[-10])
plot('sma_talib', sma_talib[-15])
plot('sma_convolve', sma_convolve[-20])
plot('sma_fftconvolve', sma_fftconvolve[-25])
def stop():
log('ticks....: %s' % info.max_ticks)
log('api......: %.5f' % storage.elapsed[-1])
log('cumsum...: %.5f' % storage.elapsed[-2])
log('pandas...: %.5f' % storage.elapsed[-3])
log('talib....: %.5f' % storage.elapsed[-4])
log('convolve.: %.5f' % storage.elapsed[-5])
log('fft......: %.5f' % storage.elapsed[-6])
wyniki:
[2015-01-31 23:00:00] ticks....: 744
[2015-01-31 23:00:00] api......: 0.16445
[2015-01-31 23:00:00] cumsum...: 0.03189
[2015-01-31 23:00:00] pandas...: 0.03677
[2015-01-31 23:00:00] talib....: 0.00700 # <<< Winner!
[2015-01-31 23:00:00] convolve.: 0.04871
[2015-01-31 23:00:00] fft......: 0.22306
NameError: name 'info' is not defined
. Dostaję ten błąd, proszę pana.
Aby uzyskać gotowe do użycia rozwiązanie, zobacz https://scipy-cookbook.readthedocs.io/items/SignalSmooth.html . Zapewnia średnią roboczą z flat
typem okna. Zauważ, że jest to nieco bardziej wyrafinowane niż prosta metoda zwojowa „zrób to sam”, ponieważ próbuje poradzić sobie z problemami na początku i na końcu danych, odzwierciedlając je (co może, ale nie musi, działać w twoim przypadku. ..).
Na początek możesz spróbować:
a = np.random.random(100)
plt.plot(a)
b = smooth(a, window='flat')
plt.plot(b)
numpy.convolve
, że różnica polega tylko na zmianie sekwencji.
w
czy rozmiar okna i s
dane?
Możesz użyć scipy.ndimage.filters.uniform_filter1d :
import numpy as np
from scipy.ndimage.filters import uniform_filter1d
N = 1000
x = np.random.random(100000)
y = uniform_filter1d(x, size=N)
uniform_filter1d
:
'reflect'
jest domyślna, ale w moim przypadku raczej tego chciałem'nearest'
Jest również dość szybki (prawie 50 razy szybszy niż np.convolve
i 2-5 razy szybszy niż podane powyżej podejście sumowania ):
%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same')
100 loops, best of 3: 9.28 ms per loop
%timeit y2 = uniform_filter1d(x, size=N)
10000 loops, best of 3: 191 µs per loop
oto 3 funkcje, które pozwalają porównać błąd / prędkość różnych implementacji:
from __future__ import division
import numpy as np
import scipy.ndimage.filters as ndif
def running_mean_convolve(x, N):
return np.convolve(x, np.ones(N) / float(N), 'valid')
def running_mean_cumsum(x, N):
cumsum = np.cumsum(np.insert(x, 0, 0))
return (cumsum[N:] - cumsum[:-N]) / float(N)
def running_mean_uniform_filter1d(x, N):
return ndif.uniform_filter1d(x, N, mode='constant', origin=-(N//2))[:-(N-1)]
uniform_filter1d
, np.convolve
z prostokąta, a np.cumsum
następnie np.subtract
. moje wyniki: (1.) splot jest najwolniejszy. (2.) suma / odejmowanie jest około 20-30x szybsze. (3.) uniform_filter1d jest około 2-3 razy szybszy niż suma / odejmowanie. zwycięzcą jest zdecydowanie uniform_filter1d.
uniform_filter1d
jest szybsze niż cumsum
rozwiązanie (o około 2-5x). i uniform_filter1d
nie otrzymuje tak dużego błędu zmiennoprzecinkowego jakcumsum
rozwiązanie.
Wiem, że to stare pytanie, ale oto rozwiązanie, które nie korzysta z żadnych dodatkowych struktur danych ani bibliotek. Jest liniowy pod względem liczby elementów listy wejściowej i nie mogę wymyślić żadnego innego sposobu, aby uczynić go bardziej wydajnym (w rzeczywistości, jeśli ktoś zna lepszy sposób przydzielenia wyniku, daj mi znać).
UWAGA: byłoby to znacznie szybsze przy użyciu tablicy numpy zamiast listy, ale chciałem wyeliminować wszystkie zależności. Możliwe byłoby również poprawienie wydajności przez wykonanie wielowątkowe
Funkcja zakłada, że lista wejściowa jest jednowymiarowa, więc bądź ostrożny.
### Running mean/Moving average
def running_mean(l, N):
sum = 0
result = list( 0 for x in l)
for i in range( 0, N ):
sum = sum + l[i]
result[i] = sum / (i+1)
for i in range( N, len(l) ):
sum = sum - l[i-N] + l[i]
result[i] = sum / N
return result
Przykład
Załóżmy, że mamy listę, data = [ 1, 2, 3, 4, 5, 6 ]
na której chcemy obliczyć średnią kroczącą z okresem 3, i że chcemy również listy wyjściowej, która ma taki sam rozmiar jak wejściowy (tak jest najczęściej).
Pierwszy element ma indeks 0, więc średnią kroczącą należy obliczyć na elementach o indeksach -2, -1 i 0. Oczywiście nie mamy danych [-2] i danych [-1] (chyba że chcesz użyć specjalnego warunki brzegowe), więc zakładamy, że te elementy są równe 0. Jest to równoważne wypełnianiu listy przez zero, z wyjątkiem tego, że tak naprawdę nie wypełniamy jej, wystarczy śledzić indeksy, które wymagają wypełnienia (od 0 do N-1).
Tak więc dla pierwszych N elementów po prostu sumujemy elementy w akumulatorze.
result[0] = (0 + 0 + 1) / 3 = 0.333 == (sum + 1) / 3
result[1] = (0 + 1 + 2) / 3 = 1 == (sum + 2) / 3
result[2] = (1 + 2 + 3) / 3 = 2 == (sum + 3) / 3
Od elementów N + 1 do przodu prosta akumulacja nie działa. oczekujemy, result[3] = (2 + 3 + 4)/3 = 3
ale różni się to od (sum + 4)/3 = 3.333
.
Sposób na obliczenie poprawnej wartości polega na odjęciu data[0] = 1
od tego sum+4
, co daje sum + 4 - 1 = 9
.
Dzieje się tak, ponieważ obecnie sum = data[0] + data[1] + data[2]
, ale jest to również prawdą dla każdego, i >= N
ponieważ przed odjęciem sum
jest data[i-N] + ... + data[i-2] + data[i-1]
.
Myślę, że można to elegancko rozwiązać za pomocą wąskiego gardła
Zobacz podstawową próbkę poniżej:
import numpy as np
import bottleneck as bn
a = np.random.randint(4, 1000, size=100)
mm = bn.move_mean(a, window=5, min_count=1)
„mm” jest ruchomym środkiem dla „a”.
„okno” to maksymalna liczba pozycji do rozważenia dla średniej ruchomej.
„min_count” to minimalna liczba pozycji do rozważenia dla średniej ruchomej (np. dla kilku pierwszych elementów lub jeśli tablica ma wartości nan).
Zaletą jest to, że wąskie gardło pomaga radzić sobie z wartościami nan, a także jest bardzo wydajne.
Nie sprawdziłem jeszcze, jak szybko to jest, ale możesz spróbować:
from collections import deque
cache = deque() # keep track of seen values
n = 10 # window size
A = xrange(100) # some dummy iterable
cum_sum = 0 # initialize cumulative sum
for t, val in enumerate(A, 1):
cache.append(val)
cum_sum += val
if t < n:
avg = cum_sum / float(t)
else: # if window is saturated,
cum_sum -= cache.popleft() # subtract oldest value
avg = cum_sum / float(n)
Ta odpowiedź zawiera rozwiązania wykorzystujące standardową bibliotekę Python dla trzech różnych scenariuszy.
itertools.accumulate
Jest to wydajne pod względem pamięci rozwiązanie w języku Python 3.2+, które oblicza średnią roboczą z iterowalnych wartości poprzez wykorzystanie dźwigni itertools.accumulate
.
>>> from itertools import accumulate
>>> values = range(100)
Zauważ, że values
może to być dowolny iterowalny, w tym generatory lub dowolny inny obiekt, który generuje wartości w locie.
Po pierwsze, leniwie konstruuj skumulowaną sumę wartości.
>>> cumu_sum = accumulate(value_stream)
Następnie enumerate
skumulowana suma (od 1) i zbuduj generator, który da ułamek skumulowanych wartości i aktualny wskaźnik wyliczenia.
>>> rolling_avg = (accu/i for i, accu in enumerate(cumu_sum, 1))
Możesz wydać, means = list(rolling_avg)
jeśli potrzebujesz wszystkich wartości w pamięci na raz lub zadzwonić next
przyrostowo.
(Oczywiście możesz także iterować rolling_avg
w for
pętli, która wywoła next
niejawnie.)
>>> next(rolling_avg) # 0/1
>>> 0.0
>>> next(rolling_avg) # (0 + 1)/2
>>> 0.5
>>> next(rolling_avg) # (0 + 1 + 2)/3
>>> 1.0
To rozwiązanie można zapisać jako funkcję w następujący sposób.
from itertools import accumulate
def rolling_avg(iterable):
cumu_sum = accumulate(iterable)
yield from (accu/i for i, accu in enumerate(cumu_sum, 1))
Ta coroutine zużywa wartości, które jej wysyłasz, i utrzymuje bieżącą średnią z wartości obserwowanych do tej pory.
Przydaje się, gdy nie masz iterowalnych wartości, ale potrzebujesz wartości uśrednianych jeden po drugim w różnych momentach życia programu.
def rolling_avg_coro():
i = 0
total = 0.0
avg = None
while True:
next_value = yield avg
i += 1
total += next_value
avg = total/i
Korpus działa w ten sposób:
>>> averager = rolling_avg_coro() # instantiate coroutine
>>> next(averager) # get coroutine going (this is called priming)
>>>
>>> averager.send(5) # 5/1
>>> 5.0
>>> averager.send(3) # (5 + 3)/2
>>> 4.0
>>> print('doing something else...')
doing something else...
>>> averager.send(13) # (5 + 3 + 13)/3
>>> 7.0
N
Ta funkcja generatora przyjmuje iterowalny rozmiar okna N
i daje średnią z bieżących wartości w oknie. Wykorzystuje strukturę deque
, która jest strukturą danych podobną do listy, ale zoptymalizowaną pod kątem szybkich modyfikacji ( pop
, append
) w obu punktach końcowych .
from collections import deque
from itertools import islice
def sliding_avg(iterable, N):
it = iter(iterable)
window = deque(islice(it, N))
num_vals = len(window)
if num_vals < N:
msg = 'window size {} exceeds total number of values {}'
raise ValueError(msg.format(N, num_vals))
N = float(N) # force floating point division if using Python 2
s = sum(window)
while True:
yield s/N
try:
nxt = next(it)
except StopIteration:
break
s = s - window.popleft() + nxt
window.append(nxt)
Oto funkcja w akcji:
>>> values = range(100)
>>> N = 5
>>> window_avg = sliding_avg(values, N)
>>>
>>> next(window_avg) # (0 + 1 + 2 + 3 + 4)/5
>>> 2.0
>>> next(window_avg) # (1 + 2 + 3 + 4 + 5)/5
>>> 3.0
>>> next(window_avg) # (2 + 3 + 4 + 5 + 6)/5
>>> 4.0
Trochę późno na imprezę, ale stworzyłem swoją własną małą funkcję, która NIE zawija się na końcach lub paskach zerami, które są następnie używane do znalezienia średniej. Kolejną zaletą jest to, że ponownie próbkuje sygnał w liniowo rozmieszczonych punktach. Dostosuj kod do woli, aby uzyskać inne funkcje.
Metoda polega na prostym pomnożeniu macierzy ze znormalizowanym jądrem Gaussa.
def running_mean(y_in, x_in, N_out=101, sigma=1):
'''
Returns running mean as a Bell-curve weighted average at evenly spaced
points. Does NOT wrap signal around, or pad with zeros.
Arguments:
y_in -- y values, the values to be smoothed and re-sampled
x_in -- x values for array
Keyword arguments:
N_out -- NoOf elements in resampled array.
sigma -- 'Width' of Bell-curve in units of param x .
'''
N_in = size(y_in)
# Gaussian kernel
x_out = np.linspace(np.min(x_in), np.max(x_in), N_out)
x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out)
gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2))
# Normalize kernel, such that the sum is one along axis 1
normalization = np.tile(np.reshape(sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in))
gauss_kernel_normalized = gauss_kernel / normalization
# Perform running average as a linear operation
y_out = gauss_kernel_normalized @ y_in
return y_out, x_out
Proste użycie sygnału sinusoidalnego z dodanym normalnym szumem rozproszonym:
sum
, stosując np.sum
zamiast 2 The @
operator (nie wiem, co to znaczy) wyrzuca błąd. Mogę przyjrzeć się temu później, ale brakuje mi teraz czasu
Zamiast numpy lub scipy polecam pandy, aby zrobić to szybciej:
df['data'].rolling(3).mean()
To bierze średnią ruchomą (MA) z 3 okresów kolumny „dane”. Można również obliczyć wersje przesunięte, na przykład tę, która wyklucza bieżącą komórkę (przesuniętą do tyłu), można łatwo obliczyć jako:
df['data'].shift(periods=1).rolling(3).mean()
pandas.rolling_mean
podczas gdy moje pandas.DataFrame.rolling
. Możesz również łatwo obliczyć ruch min(), max(), sum()
itp., A także za mean()
pomocą tej metody.
pandas.rolling_min, pandas.rolling_max
itp. Są one podobne, ale różne.
Istnieje komentarz mab zakopany w jednej z odpowiedzi powyżej, która ma tę metodę. bottleneck
ma move_mean
prostą średnią ruchomą:
import numpy as np
import bottleneck as bn
a = np.arange(10) + np.random.random(10)
mva = bn.move_mean(a, window=2, min_count=1)
min_count
jest przydatnym parametrem, który zasadniczo podnosi średnią ruchomą do tego momentu w tablicy. Jeśli nie ustawisz min_count
, będzie równy window
, a wszystko do window
punktów będzie nan
.
Inne podejście do znalezienia średniej ruchomej bez użycia numpy, panda
import itertools
sample = [2, 6, 10, 8, 11, 10]
list(itertools.starmap(lambda a,b: b/a,
enumerate(itertools.accumulate(sample), 1)))
wydrukuje [2.0, 4.0, 6.0, 6.5, 7.4, 7.833333333333333]
To pytanie jest teraz nawet starsze niż kiedy NeXuS napisał o nim w zeszłym miesiącu, ALE podoba mi się, jak jego kod radzi sobie z przypadkowymi przypadkami. Ponieważ jednak jest to „prosta średnia ruchoma”, jej wyniki są opóźnione w stosunku do danych, których dotyczą. Myślałem, że do czynienia z przypadkami brzegowych w bardziej satysfakcjonujący sposób niż trybach NumPy jest valid
, same
i full
może być osiągnięte poprzez zastosowanie podobnego podejścia do convolution()
metody opartej.
Mój wkład wykorzystuje centralną średnią bieżącą do dostosowania wyników do ich danych. Gdy dostępnych jest zbyt mało punktów, aby można było użyć pełnowymiarowego okna, średnie działające są obliczane z kolejno mniejszych okien na krawędziach tablicy. [Właściwie z kolejno większych okien, ale to szczegół implementacji.]
import numpy as np
def running_mean(l, N):
# Also works for the(strictly invalid) cases when N is even.
if (N//2)*2 == N:
N = N - 1
front = np.zeros(N//2)
back = np.zeros(N//2)
for i in range(1, (N//2)*2, 2):
front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
for i in range(1, (N//2)*2, 2):
back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])
Jest stosunkowo powolny, ponieważ korzysta z niego convolve()
i prawdopodobnie mógłby zostać wyhodowany przez prawdziwego Pythonistę, jednak uważam, że ten pomysł jest słuszny.
Istnieje wiele odpowiedzi powyżej na temat obliczania średniej biegania. Moja odpowiedź dodaje dwie dodatkowe funkcje:
Ta druga funkcja jest szczególnie przydatna do określania, które wartości odbiegają od ogólnego trendu o określoną wartość.
Używam numpy.cumsum, ponieważ jest to metoda najbardziej wydajna czasowo ( patrz odpowiedź Alleo powyżej ).
N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x)
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)
Ten kod działa tylko dla Ns. Można go wyregulować dla liczb nieparzystych, zmieniając np. Wstaw padded_x i n_nan.
Przykładowy wynik (surowy w kolorze czarnym, movavg w kolorze niebieskim):
Ten kod można łatwo dostosować do usunięcia wszystkich ruchomych średnich wartości obliczonych z mniejszej niż wartość graniczna = 3 wartości innych niż nan.
window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)
Używaj tylko biblioteki standardowej Python (efektywna pamięć)
Podaj inną wersję korzystania tylko ze standardowej biblioteki deque
. To dla mnie dość zaskakujące, że większość odpowiedzi używa pandas
lub numpy
.
def moving_average(iterable, n=3):
d = deque(maxlen=n)
for i in iterable:
d.append(i)
if len(d) == n:
yield sum(d)/n
r = moving_average([40, 30, 50, 46, 39, 44])
assert list(r) == [40.0, 42.0, 45.0, 43.0]
Właściwie znalazłem inną implementację w dokumentach Pythona
def moving_average(iterable, n=3):
# moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
# http://en.wikipedia.org/wiki/Moving_average
it = iter(iterable)
d = deque(itertools.islice(it, n-1))
d.appendleft(0)
s = sum(d)
for elem in it:
s += elem - d.popleft()
d.append(elem)
yield s / n
Wydaje mi się jednak, że wdrożenie jest nieco bardziej skomplikowane niż powinno. Ale z jakiegoś powodu musi to być standardowa dokumentacja Pythona. Czy ktoś mógłby komentować implementację mojej i standardowej dokumentacji?
O(n*d)
obliczenia ( d
czyli rozmiar okna, n
rozmiar iterowalny) i robiąO(n)
Chociaż są tutaj rozwiązania tego pytania, proszę spojrzeć na moje rozwiązanie. To jest bardzo proste i działa dobrze.
import numpy as np
dataset = np.asarray([1, 2, 3, 4, 5, 6, 7])
ma = list()
window = 3
for t in range(0, len(dataset)):
if t+window <= len(dataset):
indices = range(t, t+window)
ma.append(np.average(np.take(dataset, indices)))
else:
ma = np.asarray(ma)
Po przeczytaniu innych odpowiedzi nie sądzę, by o to pytano, ale dotarłem tutaj z potrzebą utrzymania średniej bieżącej listy wartości, które rosły.
Więc jeśli chcesz zachować listę wartości, które skądś nabywasz (witryna, urządzenie pomiarowe itp.) I średnią z ostatnich n
zaktualizowanych wartości, możesz użyć poniższego kodu, który minimalizuje wysiłek związany z dodawaniem nowych elementy:
class Running_Average(object):
def __init__(self, buffer_size=10):
"""
Create a new Running_Average object.
This object allows the efficient calculation of the average of the last
`buffer_size` numbers added to it.
Examples
--------
>>> a = Running_Average(2)
>>> a.add(1)
>>> a.get()
1.0
>>> a.add(1) # there are two 1 in buffer
>>> a.get()
1.0
>>> a.add(2) # there's a 1 and a 2 in the buffer
>>> a.get()
1.5
>>> a.add(2)
>>> a.get() # now there's only two 2 in the buffer
2.0
"""
self._buffer_size = int(buffer_size) # make sure it's an int
self.reset()
def add(self, new):
"""
Add a new number to the buffer, or replaces the oldest one there.
"""
new = float(new) # make sure it's a float
n = len(self._buffer)
if n < self.buffer_size: # still have to had numbers to the buffer.
self._buffer.append(new)
if self._average != self._average: # ~ if isNaN().
self._average = new # no previous numbers, so it's new.
else:
self._average *= n # so it's only the sum of numbers.
self._average += new # add new number.
self._average /= (n+1) # divide by new number of numbers.
else: # buffer full, replace oldest value.
old = self._buffer[self._index] # the previous oldest number.
self._buffer[self._index] = new # replace with new one.
self._index += 1 # update the index and make sure it's...
self._index %= self.buffer_size # ... smaller than buffer_size.
self._average -= old/self.buffer_size # remove old one...
self._average += new/self.buffer_size # ...and add new one...
# ... weighted by the number of elements.
def __call__(self):
"""
Return the moving average value, for the lazy ones who don't want
to write .get .
"""
return self._average
def get(self):
"""
Return the moving average value.
"""
return self()
def reset(self):
"""
Reset the moving average.
If for some reason you don't want to just create a new one.
"""
self._buffer = [] # could use np.empty(self.buffer_size)...
self._index = 0 # and use this to keep track of how many numbers.
self._average = float('nan') # could use np.NaN .
def get_buffer_size(self):
"""
Return current buffer_size.
"""
return self._buffer_size
def set_buffer_size(self, buffer_size):
"""
>>> a = Running_Average(10)
>>> for i in range(15):
... a.add(i)
...
>>> a()
9.5
>>> a._buffer # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Decreasing buffer size:
>>> a.buffer_size = 6
>>> a._buffer # should not access this!!
[9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
>>> a.buffer_size = 2
>>> a._buffer
[13.0, 14.0]
Increasing buffer size:
>>> a.buffer_size = 5
Warning: no older data available!
>>> a._buffer
[13.0, 14.0]
Keeping buffer size:
>>> a = Running_Average(10)
>>> for i in range(15):
... a.add(i)
...
>>> a()
9.5
>>> a._buffer # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>>> a.buffer_size = 10 # reorders buffer!
>>> a._buffer
[5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
"""
buffer_size = int(buffer_size)
# order the buffer so index is zero again:
new_buffer = self._buffer[self._index:]
new_buffer.extend(self._buffer[:self._index])
self._index = 0
if self._buffer_size < buffer_size:
print('Warning: no older data available!') # should use Warnings!
else:
diff = self._buffer_size - buffer_size
print(diff)
new_buffer = new_buffer[diff:]
self._buffer_size = buffer_size
self._buffer = new_buffer
buffer_size = property(get_buffer_size, set_buffer_size)
Możesz to przetestować na przykład:
def graph_test(N=200):
import matplotlib.pyplot as plt
values = list(range(N))
values_average_calculator = Running_Average(N/2)
values_averages = []
for value in values:
values_average_calculator.add(value)
values_averages.append(values_average_calculator())
fig, ax = plt.subplots(1, 1)
ax.plot(values, label='values')
ax.plot(values_averages, label='averages')
ax.grid()
ax.set_xlim(0, N)
ax.set_ylim(0, N)
fig.show()
Co daje:
Inne rozwiązanie wykorzystujące standardową bibliotekę i deque:
from collections import deque
import itertools
def moving_average(iterable, n=3):
# http://en.wikipedia.org/wiki/Moving_average
it = iter(iterable)
# create an iterable object from input argument
d = deque(itertools.islice(it, n-1))
# create deque object by slicing iterable
d.appendleft(0)
s = sum(d)
for elem in it:
s += elem - d.popleft()
d.append(elem)
yield s / n
# example on how to use it
for i in moving_average([40, 30, 50, 46, 39, 44]):
print(i)
# 40.0
# 42.0
# 45.0
# 43.0
W celach edukacyjnych dodam jeszcze dwa rozwiązania Numpy (które są wolniejsze niż rozwiązanie sumsum):
import numpy as np
from numpy.lib.stride_tricks import as_strided
def ra_strides(arr, window):
''' Running average using as_strided'''
n = arr.shape[0] - window + 1
arr_strided = as_strided(arr, shape=[n, window], strides=2*arr.strides)
return arr_strided.mean(axis=1)
def ra_add(arr, window):
''' Running average using add.reduceat'''
n = arr.shape[0] - window + 1
indices = np.array([0, window]*n) + np.repeat(np.arange(n), 2)
arr = np.append(arr, 0)
return np.add.reduceat(arr, indices )[::2]/window
Zastosowane funkcje: as_strided , add.reduceat
Wszystkie wyżej wymienione rozwiązania są złe, ponieważ ich brakuje
numpy.cumsum
lubO(len(x) * w)
implementacjom jako zwojom.Dany
import numpy
m = 10000
x = numpy.random.rand(m)
w = 1000
Zauważ, że x_[:w].sum()
równa się x[:w-1].sum()
. Tak więc dla pierwszej średniej numpy.cumsum(...)
dodaje x[w] / w
(przez x_[w+1] / w
) i odejmuje 0
(od x_[0] / w
). To skutkujex[0:w].mean()
Za pomocą sumsum zaktualizujesz drugą średnią, dodatkowo dodając x[w+1] / w
i odejmując x[0] / w
, w wyniku czego x[1:w+1].mean()
.
Trwa x[-w:].mean()
to do momentu osiągnięcia.
x_ = numpy.insert(x, 0, 0)
sliding_average = x_[:w].sum() / w + numpy.cumsum(x_[w:] - x_[:-w]) / w
To rozwiązanie jest wektoryzowane O(m)
, czytelne i stabilne numerycznie.
Co powiesz na filtr średniej ruchomej ? Jest to również jednowarstwowa i ma tę zaletę, że możesz łatwo manipulować typem okna, jeśli potrzebujesz czegoś innego niż prostokąt, tj. N-prosta prosta ruchoma średnia tablicy a:
lfilter(np.ones(N)/N, [1], a)[N:]
A po zastosowaniu trójkątnego okna:
lfilter(np.ones(N)*scipy.signal.triang(N)/N, [1], a)[N:]
Uwaga: zwykle odrzucam pierwsze N próbek jako fałszywe, dlatego [N:]
na końcu, ale nie jest to konieczne i jest to tylko kwestia osobistego wyboru.
Jeśli zdecydujesz się rzucić własną, zamiast korzystać z istniejącej biblioteki, pamiętaj o błędzie zmiennoprzecinkowym i spróbuj zminimalizować jego skutki:
class SumAccumulator:
def __init__(self):
self.values = [0]
self.count = 0
def add( self, val ):
self.values.append( val )
self.count = self.count + 1
i = self.count
while i & 0x01:
i = i >> 1
v0 = self.values.pop()
v1 = self.values.pop()
self.values.append( v0 + v1 )
def get_total(self):
return sum( reversed(self.values) )
def get_size( self ):
return self.count
Jeśli wszystkie twoje wartości są mniej więcej tego samego rzędu wielkości, pomoże to zachować precyzję, zawsze dodając wartości o mniej więcej podobnych wielkościach.