I am answering this for anyone who may need it but also if there is a more efficient (but not ugly) way to do this then please answer!
basically I just want to perform a sliding operation on a matrix. so for dataset
data_in = np.reshape(np.repeat(np.arange(10).T, 4), [10, 4])**2
array([[ 0, 0, 0, 0],
[ 1, 1, 1, 1],
[ 4, 4, 4, 4],
[ 9, 9, 9, 9],
[16, 16, 16, 16],
[25, 25, 25, 25],
[36, 36, 36, 36],
[49, 49, 49, 49],
[64, 64, 64, 64],
[81, 81, 81, 81]])
standard deviation for window of 5 (centered) would output
nan
nan
5.89915248150105
8.648699324175862
11.436782764396638
14.24078649513432
17.052858997833766
19.86957473123167
nan
nan
or with np.nanstd
1.699673171197595
3.5
5.89915248150105
8.648699324175862
11.436782764396638
14.24078649513432
17.052858997833766
19.86957473123167
16.80029761641144
13.072447700751718
CodePudding user response:
EDIT 1: based on the suggestion from @hpaulj, a found a faster method using np.lib.stride_tricks.sliding_window_view
def total_rolling_sliding_window_view(data_in, win, operation_function, shift_from_center = 0):
assert win%2 == 1, 'window must be odd'
mid = win//2
pad = np.zeros([win, data_in.shape[1]])*np.nan
L_pad = pad[:mid-shift_from_center]
R_pad = pad[:mid shift_from_center]
data_in = np.vstack([L_pad, data_in, R_pad])
data_in = np.lib.stride_tricks.sliding_window_view(data_in, (win, data_in.shape[1]))
data_in = np.reshape(data_in, [-1, win*data_in.shape[1]])
data_out = operation_function(data_in, axis = 1)
return np.asarray(data_out)
def total_rolling_operation(data_in, win, operation_function, shift_from_center = 0):
assert win%2 == 1, 'window must be odd'
mid = win//2
pad = np.zeros([win, data_in.shape[1]])*np.nan
L_pad = pad[:mid-shift_from_center]
R_pad = pad[:mid shift_from_center]
data_in = np.vstack([L_pad, data_in, R_pad])
data_out = []
for k in range(data_in.shape[0]-win 1):
x = data_in[k:(k win)]
data_out.append(operation_function(x))
return np.asarray(data_out)
ensure they output the same values
data_in = np.reshape(np.arange(10*4), (10, 4))**2
data_out_og = total_rolling_operation(data_in, 5, np.std, shift_from_center = 0)
data_out_sliding_window_view = total_rolling_sliding_window_view(data_in, 5, np.std, shift_from_center = 0)
print(np.asarray([data_out_og, data_out_sliding_window_view]).T)
output:
[[ nan nan]
[ nan nan]
[113.49471353 113.49471353]
[158.48359537 158.48359537]
[203.98296498 203.98296498]
[249.71393634 249.71393634]
[295.56902747 295.56902747]
[341.49824304 341.49824304]
[ nan nan]
[ nan nan]]
time it
w, h = 4, 10000
data_in = np.reshape(np.arange(h*w), (h, w))**2
%timeit -n 10 data_out_og = total_rolling_operation(data_in, 5, np.std, shift_from_center = 0)
%timeit -n 10 data_out_sliding_window_view = total_rolling_sliding_window_view(data_in, 5, np.std, shift_from_center = 0)
output:
10 loops, best of 5: 256 ms per loop
10 loops, best of 5: 1.6 ms per loop
Original post:
def total_rolling_operation(data_in, win, operation_function, shift_from_center = 0):
assert win%2 == 1, 'window must be odd'
mid = win//2
pad = np.zeros([win, data_in.shape[1]])*np.nan
L_pad = pad[:mid-shift_from_center]
R_pad = pad[:mid shift_from_center]
data_in = np.vstack([L_pad, data_in, R_pad])
is_nan_inds = []
data_out = []
for k in range(data_in.shape[0]-win 1):
x = data_in[k:(k win)]
data_out.append(operation_function(x))
is_nan_inds.append(np.any(np.isnan(x)))
return np.asarray(data_out), np.asarray(is_nan_inds)
data_in = np.reshape(np.repeat(np.arange(10).T, 4), [10, 4])**2
data_out, is_nan_inds = total_rolling_operation(data_in, 5, np.std, shift_from_center = 0)
_ = [print(i1, i2) for i1, i2 in zip(data_out, is_nan_inds)]
output
nan True
nan True
5.89915248150105 False
8.648699324175862 False
11.436782764396638 False
14.24078649513432 False
17.052858997833766 False
19.86957473123167 False
nan True
nan True
or you could define your own function
def custom_nanstd(x):
return np.sqrt(np.nanmean((x - np.nanmean(x))**2))
data_out, is_nan_inds = total_rolling_operation(data_in, 5, custom_nanstd, shift_from_center = 0)
_ = [print(i1, i2) for i1, i2 in zip(data_out, is_nan_inds)]
output
1.699673171197595 True
3.5 True
5.89915248150105 False
8.648699324175862 False
11.436782764396638 False
14.24078649513432 False
17.052858997833766 False
19.86957473123167 False
16.80029761641144 True
13.072447700751718 True
and if you dont want the requirement for the window to be odd (which ensures data will be centered around a point) you can reconfigure to always start from left or right side.
CodePudding user response:
If you rewrite the variance over N
samples of x
as sum(x**2) - sum(x)**2 / N
, you can use a convolution with a boxcar kernel to do the computation. For example:
import numpy as np
from scipy.signal import convolve
x = np.array([
[ 0, 0, 0, 0],
[ 1, 1, 1, 1],
[ 4, 4, 4, 4],
[ 9, 9, 9, 9],
[16, 16, 16, 16],
[25, 25, 25, 25],
[36, 36, 36, 36],
[49, 49, 49, 49],
[64, 64, 64, 64],
[81, 81, 81, 81]])
N = 5
kernel = np.ones((N, 1), 1)
sigma = np.sqrt(convolve(x**2, kernel, mode='valid') - convolve(x, kernel, mode='valid')**2 / N)
In this case, we get a sigma
of
array([[13.19090596, 13.19090596, 13.19090596, 13.19090596],
[19.33907961, 19.33907961, 19.33907961, 19.33907961],
[25.57342371, 25.57342371, 25.57342371, 25.57342371],
[31.84336666, 31.84336666, 31.84336666, 31.84336666],
[38.13135193, 38.13135193, 38.13135193, 38.13135193],
[44.42971978, 44.42971978, 44.42971978, 44.42971978]])
While I'm pretty sure that scipy no longer provides this, the boxcar convolution can be optimized to run in O(n) time.