Say I have a Numpy array of N = 10
random float numbers:
import numpy as np
np.random.seed(99)
N = 10
arr = np.random.uniform(0., 10., size=(N,))
print(arr)
out[1]: [6.72278559 4.88078399 8.25495174 0.31446388 8.08049963
5.6561742 2.97622499 0.46695721 9.90627399 0.06825733]
I want to find all unique pairs of numbers that are not different from each other more than a tolerance tol = 1.
(i.e. absolute difference <= 1). Specifically, I want to get all unique pairs of indexes. The indexes of each close pair should be sorted, and all close pairs should be sorted by the first index. I managed to write the following working code:
def all_close_pairs(arr, tol=1.):
res = set()
for i, x1 in enumerate(arr):
for j, x2 in enumerate(arr):
if i == j:
continue
if np.isclose(x1, x2, rtol=0., atol=tol):
res.add(tuple(sorted([i, j])))
res = np.array(list(res))
return res[res[:,0].argsort()]
print(all_close_pairs(arr, tol=1.))
out[2]: [[1 5]
[2 4]
[3 7]
[3 9]
[7 9]]
However, in reality I have an array of N = 1000
numbers, and my code becomes extremely slow due to the nested for loops. I believe there are much more efficient ways to do this with Numpy vectorization. Does anyone know the fastest way to do this in Numpy?
CodePudding user response:
This is a solution with pure numpy operations. It seems pretty fast on my machine, but I don't know what kind of speed we're looking for.
def all_close_pairs(arr, tol=1.):
N = arr.shape[0]
# get indices in the array to consider using meshgrid
pair_coords = np.array(np.meshgrid(np.arange(N), np.arange(N))).T
# filter out pairs so we get indices in increasing order
pair_coords = pair_coords[pair_coords[:, :, 0] < pair_coords[:, :, 1]]
# compare indices in your array for closeness
is_close = np.isclose(arr[pair_coords[:, 0]], arr[pair_coords[:, 1]], rtol=0, atol=tol)
return pair_coords[is_close, :]
CodePudding user response:
One efficient solution is to first sort the input values using index = np.argsort()
. Then, you can generate the sorted array using arr[index]
, and then iterate over the close values in quasi-linear time if the number of pair is small on a fast contiguous array. If the number of pair is big, then the complexity is quadratic due to the quadratic number of pair generated. THe resulting complexity is: O(n log n m)
where n
is the size of the input array and m
is the number of pair produced.
To find values close to each other, one efficient way is to iterate over the value using Numba. Indeed, while it might be possible in Numpy, it will likely not be efficient due to the variable number of value to be compared. Here is an implementation:
import numba as nb
@nb.njit('int32[:,::1](float64[::1], float64)')
def findCloseValues(arr, tol):
res = []
for i in range(arr.size):
val = arr[i]
# Iterate over the close numbers (only once)
for j in range(i 1, arr.size):
# Sadly neither np.isclose or np.abs are implemented in Numba so far
if max(val, arr[j]) - min(val, arr[j]) >= tol:
break
res.append((i, j))
if len(res) == 0: # No pairs: we need to help Numpy to know the shape
return np.empty((0, 2), dtype=np.int32)
return np.array(res, dtype=np.int32)
Finally, the indices need to be update to reference the indices in the unsorted array and not the sorted one. You can do that using index[result]
.
Here is the resulting code:
index = arr.argsort()
result = findCloseValues(arr[index], 1.0)
print(index[result])
Here is the result (the order is not the same as in the question but you could sort it if needed):
array([[9, 3],
[9, 7],
[3, 7],
[1, 5],
[4, 2]])
Improving the complexity of the algorithm
If you need a faster algorithm, then you can use another output format: you can for each input value provide the min/max range of values close to the target input value. To find the range, you can use a binary search (see: np.searchsorted
) on the sorted array. The resulting algorithm runs in O(n log n)
. However, you cannot get the indices in the unsorted array since the range would be non contiguous.
Benchmark
Here are performance results on a random input with 1000 items and a tolerance of 1.0, on my machine:
Reference implementation: ~17000 ms (x 1)
Angelicos' implementation: 1773 ms (x ~10)
Rivers' implementation: 122 ms (x 139)
Rchome's implementation: 20 ms (x 850)
Chris' implementation: 4.57 ms (x 3720)
This implementation: 0.67 ms (x 25373)
CodePudding user response:
A bit late but an all numpy solution:
import numpy as np
def close_enough( arr, tol = 1 ):
result = np.where( np.triu(np.isclose( arr[ :, None ], arr[ None, : ], rtol = 0.0, atol = tol ), 1))
return np.swapaxes( result, 0, 1 )
Expanded to explain what is happening
def close_enough( arr, tol = 1 ):
bool_arr = np.isclose( arr[ :, None ], arr[ None, : ], rtol = 0.0, atol = tol )
# is_close generates a square array after comparing all elements with all elements.
bool_arr = np.triu( bool_arr, 1 )
# Keep the upper right triangle, offset by 1 column. i.e. zero the main diagonal
# and all elements below and to the left.
result = np.where( bool_arr ) # Return the row and column indices for Trues
return np.swapaxes( result, 0, 1 ) # Return the pairs in rows rather than columns
With N = 1000, arr = an array of floats
%timeit close_enough( arr, tol = 1 )
14.1 ms ± 28.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [19]: %timeit all_close_pairs( arr, tol = 1 )
54.3 ms ± 268 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
(close_enough( arr, tol = 1) == all_close_pairs( arr, tol = 1 )).all()
# True
CodePudding user response:
The problem is that your code has O(n*n) (quadratic) complexity. To lower complexity, you can try to sort items first:
def all_close_pairs(arr, tol=1.):
res = set()
arr = sorted(enumerate(arr), key=lambda x: x[1])
for (idx1, (i, x1)) in enumerate(arr):
for idx2 in range(idx1-1, -1, -1):
j, x2 = arr[idx2]
if not np.isclose(x1, x2, rtol=0., atol=tol):
break
indices = sorted([i, j])
res.add(tuple(indices))
return np.array(sorted(res))
However, this would only work if range of your values much larger than tolerance.
You could improve this further by using 2 pointers
strategy but overall complexity would remain same.
CodePudding user response:
You could first create combinations with itertools.combinations:
def all_close_pairs(arr, tolerance):
pairs = list(combinations(arr, 2))
indexes = list(combinations(range(len(arr)), 2))
all_close_pairs_indexes = [indexes[i] for i,pair in enumerate(pairs) if abs(pair[0] - pair[1]) <= tolerance]
return all_close_pairs_indexes
Now, for N=1000, you will have to compare only 499500 pairs instead of 1 million.
How it works:
We first create the pairs via itertools.combinations.
Then, we create the list of their indexes.
We use a list comprehension instead of a for loop, for speed reasons.
In this comprehension, we iterate all pairs, using
enumerate
so we can get the index of the pair, we compute the absolute difference of the numbers in the pair, and if check if it's less or equal than thetolerance
.If the absolute difference is less or equal than
tolerance
, we get the indexes of the pairs's numbers via the list of indexes, and add them to our final list.