I have part of metlab function, that multiply two matrices by 8,8 block. Table1 is 8x8 shape, table2 is 320x240 shape. I want to transmofr below code to python.
fun = @(x)x.data .*table1;
I_spatial = blockproc(table2,[8 8],fun);
I want to use method to multiply matricies like np.dot, but sizes of input arrays is not the same in row-column connection, so I cannot do it easily. Can somebody could help me, how can I port that fragment to Python? Also I have second part of this function
fun=@(x)idct2(x.data);
I_spatial = blockproc(I_spatial,[8 8],fun) 128;
How can I write that part in Python?
CodePudding user response:
there aren't any premade versions I am aware of, but this implementation will be quite fast, as data copying is minimal, granted that it'll always pad the output to be of the proper size.
def blockproc(A,m,n,fun):
results_rows = []
for y in range(0,A.shape[0],m):
results_cols = []
for x in range(0,A.shape[1],n):
results_cols.append(fun(A[y:y m,x:x n]))
results_rows.append(results_cols)
patch_rows = results_rows[0][0].shape[0]
patch_cols = results_rows[0][0].shape[1]
final_array_cols = results_rows[0][0].shape[1] * len(results_rows[0])
final_array_rows = results_rows[0][0].shape[0] * len(results_rows)
final_array = np.zeros([final_array_rows,final_array_cols],dtype=results_rows[0][0].dtype)
for y in range(len(results_rows)):
for x in range(len(results_rows[y])):
data = results_rows[y][x]
final_array[y*patch_rows:y*patch_rows data.shape[0],x*patch_cols:x*patch_cols data.shape[1]] = data
return final_array
testing it:
a = np.ones([320,240])
b = np.zeros([8,8])
def func_mul(x):
return x@b
result = blockproc(a,8,8,func_mul)
print('dims:',result.shape)
import time
t1 = time.time()
for i in range(1000):
blockproc(a, 8, 8, func_mul)
pass
t2 = time.time()
print('time:',(t2-t1)/1000)
dims:(320, 240)
time:0.006634121179580689
CodePudding user response:
Like @Ahmed AEK mentioned, there is no built-in solution for this. I have come up with a solution that leverages numpy
's extremely optimized vsplit
and hsplit
functions and even allows you to perform the function inplace:
import scipy
import numpy as np
from typing import *
from scipy.fftpack import idct
npd = NewType('npd', np.ndarray)
id = lambda x: x # default function is nothing
def blockproc(A: npd, blockdims: Tuple[int, int], func: Callable[npd, Any]=id, inplace: bool = False)-> npd:
blocks: List[npd] = []
if A.shape[0]%blockdims[0] != 0 or A.shape[1]%blockdims[1] != 0:
print(f"Invalid block dimensions - {A.shape} must be divided evenly by {tuple(blockdims)}")
vr, hr = A.shape[0]//blockdims[0], A.shape[1]//blockdims[1]
B = A if inplace else A.copy()
verts: List[npd] = np.vsplit(B,vr)
try:
for i in range(len(verts)):
for j,v in enumerate(np.hsplit(verts[i], hr)):
B[i*blockdims[0]:(i 1)*blockdims[0], j*blockdims[1]: (j 1)*blockdims[1]] = func(h)
except Exception as e: print("Invalid block function"); exit(e)
return B
if __name__ == "__main__":
# Assume table1 and table2 are defined above ...
# First code sample
fun = lambda x: x@table1
I_spatial = blockproc(table2 ,[8,8], fun)
# Second code sample
fun = lambda x: idct(x)
I_spatial = blockproc(I_spatial,[8 8],fun) 128
Look at the two code samples you provided - nearly identical! If you're curious, click here for more info about idct
EDIT:
Per @Ahmed AEK's comments (see below), it appears enumerate
is slowing down the code significantly. I've now removed the outer enumerate
in an effort to decrease runtime.
CodePudding user response:
Using Ahmed's function and example:
In [284]: a = np.ones([320, 240])
...: b = np.zeros([8, 8])
...:
...:
...: def func_mul(x):
...: return x @ b
In [285]: result = blockproc(a, 8, 8, func_mul)
In [286]: result.shape
Out[286]: (320, 240)
In a comment I suggested reshaping/transposing the a
to a (n,m,8,8) array:
In [287]: a1 = a.reshape(40, 8, 30, 8).transpose(0, 2, 1, 3)
In [288]: a1.shape
Out[288]: (40, 30, 8, 8)
In [289]: res = a1 @ b # matmul does 'batch' on lead dimensions
In [290]: res.shape
Out[290]: (40, 30, 8, 8)
In [291]: res1 = res.transpose(0, 2, 1, 3).reshape(a.shape)
Compare times:
In [292]: timeit result = blockproc(a, 8, 8, func_mul)
10.2 ms ± 171 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [293]: def foo(a, b):
...: a1 = a.reshape(40, 8, 30, 8).transpose(0, 2, 1, 3)
...: res = a1 @ b
...: res1 = res.transpose(0, 2, 1, 3).reshape(a.shape)
...: return res1
In [294]: timeit foo(a,b)
918 µs ± 2.19 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Changing the arrays so the result values are significant (not all 0) to verify the equality of these methods:
In [295]: a = np.arange(320 * 240).reshape(320, 240)
In [296]: b = np.arange(64).reshape(8, 8)
In [297]: result = blockproc(a, 8, 8, func_mul)
In [298]: res1 = foo(a, b)
In [299]: np.allclose(result, res1)
Out[299]: True
My approach is much faster because it does not iterate on the lead (40,30) dimensions. But it depends on the func
being something like matmul
that can work with this mix of dimension. In other words, a function that makes full use of numpy
broadcasting
.
edit
And @Victor's version:
In [308]: def victor(A, blockdims, func):
...: vr, hr = A.shape[0] // blockdims[0], A.shape[1] // blockdims[1]
...: B = A.copy()
...: verts = np.vsplit(B, vr)
...: for i in range(len(verts)):
...: for j, v in enumerate(np.hsplit(verts[i], hr)):
...: B[
...: i * blockdims[0] : (i 1) * blockdims[0],
...: j * blockdims[1] : (j 1) * blockdims[1],
...: ] = func(v)
...: return B
...:
In [309]: res2 = victor(a, (8, 8), func_mul)
In [310]: res2.shape
Out[310]: (320, 240)
In [311]: np.allclose(result, res2)
Out[311]: True
In [312]: timeit res2 = victor(a, (8, 8), func_mul)
13.7 ms ± 5.51 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)