Home > Enterprise >  Fastest way to find all pairs of close numbers in a Numpy array
Fastest way to find all pairs of close numbers in a Numpy array

Time:11-29

Say I have a Numpy array of N = 10 random float numbers:

import numpy as np
np.random.seed(99)
N = 10
arr = np.random.uniform(0., 10., size=(N,))
print(arr)

out[1]: [6.72278559 4.88078399 8.25495174 0.31446388 8.08049963 
         5.6561742 2.97622499 0.46695721 9.90627399 0.06825733]

I want to find all unique pairs of numbers that are not different from each other more than a tolerance tol = 1. (i.e. absolute difference <= 1). Specifically, I want to get all unique pairs of indexes. The indexes of each close pair should be sorted, and all close pairs should be sorted by the first index. I managed to write the following working code:

def all_close_pairs(arr, tol=1.):
    res = set()
    for i, x1 in enumerate(arr):
        for j, x2 in enumerate(arr):
            if i == j:
                continue
            if np.isclose(x1, x2, rtol=0., atol=tol):
                res.add(tuple(sorted([i, j])))
    res = np.array(list(res))
    return res[res[:,0].argsort()]

print(all_close_pairs(arr, tol=1.))

out[2]: [[1 5]
         [2 4]
         [3 7]
         [3 9]
         [7 9]]

However, in reality I have an array of N = 1000 numbers, and my code becomes extremely slow due to the nested for loops. I believe there are much more efficient ways to do this with Numpy vectorization. Does anyone know the fastest way to do this in Numpy?

CodePudding user response:

This is a solution with pure numpy operations. It seems pretty fast on my machine, but I don't know what kind of speed we're looking for.

def all_close_pairs(arr, tol=1.):
    N = arr.shape[0]
    # get indices in the array to consider using meshgrid
    pair_coords = np.array(np.meshgrid(np.arange(N), np.arange(N))).T
    # filter out pairs so we get indices in increasing order
    pair_coords = pair_coords[pair_coords[:, :, 0] < pair_coords[:, :, 1]]
    # compare indices in your array for closeness
    is_close = np.isclose(arr[pair_coords[:, 0]], arr[pair_coords[:, 1]], rtol=0, atol=tol)
    return pair_coords[is_close, :]

CodePudding user response:

One efficient solution is to first sort the input values using index = np.argsort(). Then, you can generate the sorted array using arr[index], and then iterate over the close values in quasi-linear time if the number of pair is small on a fast contiguous array. If the number of pair is big, then the complexity is quadratic due to the quadratic number of pair generated. THe resulting complexity is: O(n log n m) where n is the size of the input array and m is the number of pair produced.

To find values close to each other, one efficient way is to iterate over the value using Numba. Indeed, while it might be possible in Numpy, it will likely not be efficient due to the variable number of value to be compared. Here is an implementation:

import numba as nb

@nb.njit('int32[:,::1](float64[::1], float64)')
def findCloseValues(arr, tol):
    res = []
    for i in range(arr.size):
        val = arr[i]
        # Iterate over the close numbers (only once)
        for j in range(i 1, arr.size):
            # Sadly neither np.isclose or np.abs are implemented in Numba so far
            if max(val, arr[j]) - min(val, arr[j]) >= tol:
                break
            res.append((i, j))
    if len(res) == 0: # No pairs: we need to help Numpy to know the shape
        return np.empty((0, 2), dtype=np.int32)
    return np.array(res, dtype=np.int32)

Finally, the indices need to be update to reference the indices in the unsorted array and not the sorted one. You can do that using index[result].

Here is the resulting code:

index = arr.argsort()
result = findCloseValues(arr[index], 1.0)
print(index[result])

Here is the result (the order is not the same as in the question but you could sort it if needed):

array([[9, 3],
       [9, 7],
       [3, 7],
       [1, 5],
       [4, 2]])

Improving the complexity of the algorithm

If you need a faster algorithm, then you can use another output format: you can for each input value provide the min/max range of values close to the target input value. To find the range, you can use a binary search (see: np.searchsorted) on the sorted array. The resulting algorithm runs in O(n log n). However, you cannot get the indices in the unsorted array since the range would be non contiguous.

Benchmark

Here are performance results on a random input with 1000 items and a tolerance of 1.0, on my machine:

Reference implementation:   ~17000 ms             (x 1)
Angelicos' implementation:    1773 ms           (x ~10)
Rivers' implementation:        122 ms           (x 139)
Rchome's implementation:        20 ms           (x 850)
Chris' implementation:           4.57 ms       (x 3720)
This implementation:             0.67 ms      (x 25373)

CodePudding user response:

A bit late but an all numpy solution:

import numpy as np

def close_enough( arr, tol = 1 ): 
    result = np.where( np.triu(np.isclose( arr[ :, None ], arr[ None, : ], rtol = 0.0, atol = tol ), 1)) 
    return np.swapaxes( result, 0, 1 ) 

Expanded to explain what is happening

def close_enough( arr, tol = 1 ):
    bool_arr = np.isclose( arr[ :, None ], arr[ None, : ], rtol = 0.0, atol = tol )
    # is_close generates a square array after comparing all elements with all elements.  

    bool_arr = np.triu( bool_arr, 1 ) 
    # Keep the upper right triangle, offset by 1 column. i.e. zero the main diagonal 
    # and all elements below and to the left.

    result = np.where( bool_arr )  # Return the row and column indices for Trues
    return np.swapaxes( result, 0, 1 ) # Return the pairs in rows rather than columns 

With N = 1000, arr = an array of floats

%timeit close_enough( arr, tol = 1 )                                                                              
14.1 ms ± 28.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

In [19]: %timeit all_close_pairs( arr, tol = 1 )                                                                           
54.3 ms ± 268 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

(close_enough( arr, tol = 1) == all_close_pairs( arr, tol = 1 )).all()                                            
# True

CodePudding user response:

The problem is that your code has O(n*n) (quadratic) complexity. To lower complexity, you can try to sort items first:

def all_close_pairs(arr, tol=1.):
    res = set()
    arr = sorted(enumerate(arr), key=lambda x: x[1])
    for (idx1, (i, x1)) in enumerate(arr):
        for idx2 in range(idx1-1, -1, -1):
            j, x2 = arr[idx2]
            if not np.isclose(x1, x2, rtol=0., atol=tol):
                break
            indices = sorted([i, j])
            res.add(tuple(indices))
    return np.array(sorted(res))

However, this would only work if range of your values much larger than tolerance.

You could improve this further by using 2 pointers strategy but overall complexity would remain same.

CodePudding user response:

You could first create combinations with itertools.combinations:

def all_close_pairs(arr, tolerance):
    pairs = list(combinations(arr, 2))
    indexes = list(combinations(range(len(arr)), 2))
    all_close_pairs_indexes = [indexes[i] for i,pair in enumerate(pairs) if abs(pair[0] - pair[1]) <=  tolerance]
    return all_close_pairs_indexes

Now, for N=1000, you will have to compare only 499500 pairs instead of 1 million.

How it works:

  • We first create the pairs via itertools.combinations.

  • Then, we create the list of their indexes.

  • We use a list comprehension instead of a for loop, for speed reasons.

  • In this comprehension, we iterate all pairs, using enumerate so we can get the index of the pair, we compute the absolute difference of the numbers in the pair, and if check if it's less or equal than the tolerance.

  • If the absolute difference is less or equal than tolerance, we get the indexes of the pairs's numbers via the list of indexes, and add them to our final list.

  • Related