Home > Blockchain >  efficient convenient sliding/rolling operation on a 2D numpy matrix?
efficient convenient sliding/rolling operation on a 2D numpy matrix?

Time:03-13

I am answering this for anyone who may need it but also if there is a more efficient (but not ugly) way to do this then please answer!

basically I just want to perform a sliding operation on a matrix. so for dataset

data_in = np.reshape(np.repeat(np.arange(10).T, 4), [10, 4])**2

array([[ 0,  0,  0,  0],
       [ 1,  1,  1,  1],
       [ 4,  4,  4,  4],
       [ 9,  9,  9,  9],
       [16, 16, 16, 16],
       [25, 25, 25, 25],
       [36, 36, 36, 36],
       [49, 49, 49, 49],
       [64, 64, 64, 64],
       [81, 81, 81, 81]])

standard deviation for window of 5 (centered) would output

nan
nan
5.89915248150105
8.648699324175862
11.436782764396638
14.24078649513432
17.052858997833766
19.86957473123167
nan
nan

or with np.nanstd

1.699673171197595
3.5
5.89915248150105
8.648699324175862
11.436782764396638
14.24078649513432
17.052858997833766
19.86957473123167
16.80029761641144
13.072447700751718

CodePudding user response:

EDIT 1: based on the suggestion from @hpaulj, a found a faster method using np.lib.stride_tricks.sliding_window_view

def total_rolling_sliding_window_view(data_in, win, operation_function, shift_from_center = 0):
  assert win%2 == 1, 'window must be odd'
  mid = win//2
  pad = np.zeros([win, data_in.shape[1]])*np.nan

  L_pad = pad[:mid-shift_from_center]
  R_pad = pad[:mid shift_from_center]

  data_in = np.vstack([L_pad, data_in, R_pad])
  
  data_in = np.lib.stride_tricks.sliding_window_view(data_in, (win, data_in.shape[1]))
  data_in = np.reshape(data_in, [-1, win*data_in.shape[1]])
  data_out = operation_function(data_in, axis = 1)
  return np.asarray(data_out)
  
def total_rolling_operation(data_in, win, operation_function, shift_from_center = 0):
  assert win%2 == 1, 'window must be odd'
  mid = win//2
  pad = np.zeros([win, data_in.shape[1]])*np.nan

  L_pad = pad[:mid-shift_from_center]
  R_pad = pad[:mid shift_from_center]

  data_in = np.vstack([L_pad, data_in, R_pad])
  
  data_out = []
  for k in range(data_in.shape[0]-win 1):
    x = data_in[k:(k win)] 
    data_out.append(operation_function(x))
  return np.asarray(data_out)

ensure they output the same values

data_in = np.reshape(np.arange(10*4), (10, 4))**2
data_out_og = total_rolling_operation(data_in, 5, np.std, shift_from_center = 0)
data_out_sliding_window_view = total_rolling_sliding_window_view(data_in, 5, np.std, shift_from_center = 0)
print(np.asarray([data_out_og, data_out_sliding_window_view]).T)

output:

[[         nan          nan]
 [         nan          nan]
 [113.49471353 113.49471353]
 [158.48359537 158.48359537]
 [203.98296498 203.98296498]
 [249.71393634 249.71393634]
 [295.56902747 295.56902747]
 [341.49824304 341.49824304]
 [         nan          nan]
 [         nan          nan]]

time it

w, h = 4, 10000
data_in = np.reshape(np.arange(h*w), (h, w))**2
%timeit -n 10 data_out_og = total_rolling_operation(data_in, 5, np.std, shift_from_center = 0)
%timeit -n 10 data_out_sliding_window_view = total_rolling_sliding_window_view(data_in, 5, np.std, shift_from_center = 0)

output:

10 loops, best of 5: 256 ms per loop
10 loops, best of 5: 1.6 ms per loop

Original post:

def total_rolling_operation(data_in, win, operation_function, shift_from_center = 0):
  assert win%2 == 1, 'window must be odd'
  mid = win//2
  pad = np.zeros([win, data_in.shape[1]])*np.nan

  L_pad = pad[:mid-shift_from_center]
  R_pad = pad[:mid shift_from_center]

  data_in = np.vstack([L_pad, data_in, R_pad])
  
  is_nan_inds = []
  data_out = []
  for k in range(data_in.shape[0]-win 1):
    x = data_in[k:(k win)] 
    data_out.append(operation_function(x))
    is_nan_inds.append(np.any(np.isnan(x)))
  return np.asarray(data_out), np.asarray(is_nan_inds)
data_in = np.reshape(np.repeat(np.arange(10).T, 4), [10, 4])**2

data_out, is_nan_inds = total_rolling_operation(data_in, 5, np.std, shift_from_center = 0)
_ = [print(i1, i2) for i1, i2 in zip(data_out, is_nan_inds)]

output

nan True
nan True
5.89915248150105 False
8.648699324175862 False
11.436782764396638 False
14.24078649513432 False
17.052858997833766 False
19.86957473123167 False
nan True
nan True

or you could define your own function

def custom_nanstd(x):
  return np.sqrt(np.nanmean((x - np.nanmean(x))**2))
data_out, is_nan_inds = total_rolling_operation(data_in, 5, custom_nanstd, shift_from_center = 0)
_ = [print(i1, i2) for i1, i2 in zip(data_out, is_nan_inds)]

output

1.699673171197595 True
3.5 True
5.89915248150105 False
8.648699324175862 False
11.436782764396638 False
14.24078649513432 False
17.052858997833766 False
19.86957473123167 False
16.80029761641144 True
13.072447700751718 True

and if you dont want the requirement for the window to be odd (which ensures data will be centered around a point) you can reconfigure to always start from left or right side.

CodePudding user response:

If you rewrite the variance over N samples of x as sum(x**2) - sum(x)**2 / N, you can use a convolution with a boxcar kernel to do the computation. For example:

import numpy as np
from scipy.signal import convolve

x = np.array([
    [ 0,  0,  0,  0],
    [ 1,  1,  1,  1],
    [ 4,  4,  4,  4],
    [ 9,  9,  9,  9],
    [16, 16, 16, 16],
    [25, 25, 25, 25],
    [36, 36, 36, 36],
    [49, 49, 49, 49],
    [64, 64, 64, 64],
    [81, 81, 81, 81]])
N = 5
kernel = np.ones((N, 1), 1)
sigma = np.sqrt(convolve(x**2, kernel, mode='valid') - convolve(x, kernel, mode='valid')**2 / N)

In this case, we get a sigma of

array([[13.19090596, 13.19090596, 13.19090596, 13.19090596],
       [19.33907961, 19.33907961, 19.33907961, 19.33907961],
       [25.57342371, 25.57342371, 25.57342371, 25.57342371],
       [31.84336666, 31.84336666, 31.84336666, 31.84336666],
       [38.13135193, 38.13135193, 38.13135193, 38.13135193],
       [44.42971978, 44.42971978, 44.42971978, 44.42971978]])

While I'm pretty sure that scipy no longer provides this, the boxcar convolution can be optimized to run in O(n) time.

  • Related