Input
import numpy as np
import itertools
a = np.array([ 1, 6, 7, 8, 10, 11, 13, 14, 15, 19, 20, 23, 24, 26, 28, 29, 33,
34, 41, 42, 43, 44, 45, 46, 47, 52, 54, 58, 60, 61, 65, 70, 75]).astype(np.uint8)
b = np.array([ 2, 3, 4, 10, 12, 14, 16, 20, 22, 26, 28, 29, 30, 31, 34, 36, 37,
38, 39, 40, 41, 46, 48, 49, 50, 52, 53, 55, 56, 57, 59, 60, 63, 66,
67, 68, 69, 70, 71, 74]).astype(np.uint8)
c = np.array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67,
68, 69, 70, 71, 72, 73, 74, 75]).astype(np.uint8)
I would like to get the Cartesian product of the 3 arrays but I do not want any duplicate elements in one row [1, 2, 1]
would not be valid and only one of these two would be valid [10, 14, 0]
or [14, 10, 0]
since 10 and 14 are both in a
and b
.
Python only
def no_numpy():
combos = {tuple(set(i)): i for i in itertools.product(a, b, c)}
combos = [val for key, val in combos.items() if len(key) == 3]
%timeit no_numpy() # 32.5 ms ± 508 µs per loop
Numpy
# Solution from (https://stackoverflow.com/a/11146645/18158000)
def cartesian_product(*arrays):
broadcastable = np.ix_(*arrays)
broadcasted = np.broadcast_arrays(*broadcastable)
rows, cols = np.prod(broadcasted[0].shape), len(broadcasted)
dtype = np.result_type(*arrays)
out = np.empty(rows * cols, dtype=dtype)
start, end = 0, rows
for a in broadcasted:
out[start:end] = a.reshape(-1)
start, end = end, end rows
return out.reshape(cols, rows).T
def numpy():
combos = {tuple(set(i)): i for i in cartesian_product(*[a, b, c])}
combos = [val for key, val in combos.items() if len(key) == 3]
%timeit numpy() # 96.2 ms ± 136 µs per loop
My guess is in the numpy version converting the np.array
to a set is why it is much slower but when comparing strictly getting the initial products cartesian_product
is much faster than itertools.product
.
Can the numpy version be modified in anyway to outperform the pure python solution or is there another solution that outperforms both?
CodePudding user response:
Why current implementations are slow
While the first solution is faster than the second one, it is quite inefficient since it creates a lot of temporary CPython objects (at least 6 per item of itertools.product
). Creating a lot of objects is expensive because they are dynamically allocated and reference-counted by CPython. The Numpy function cartesian_product
is pretty fast but the iteration over the resulting array is very slow because it creates a lot of Numpy views and operates on numpy.uint8
instead of CPython int
. Numpy types and functions introduce a huge overhead for very small arrays.
Numpy can be used to speed up this operation as shown by @AlainT but this is not trivial to do and Numpy does not shine to solve such problems.
How to improve performance
One solution is to use Numba to do the job yourself more efficiently and let the Numba's JIT compiler optimizes loops. You can use 3 nested loops to efficiently generate the value of the Cartesian product and filter items. A dictionary can be used to track already seen values. The tuple of 3 items can be packed into one integer so to reduce the memory footprint and improve performance (so the dictionary can better fit in CPU caches and avoid the creation of slow tuple objects).
Here is the resulting code:
import numba as nb
# Signature of the function (parameter types)
# Note: `::1` means the array is contiguous
@nb.njit('(uint8[::1], uint8[::1], uint8[::1])')
def with_numba(a, b, c):
seen = dict()
for va in a:
for vb in b:
for vc in c:
# If the 3 values are different
if va != vb and vc != vb and vc != va:
# Sort the 3 values using a fast sorting network
v1, v2, v3 = va, vb, vc
if v1 > v2: v1, v2 = v2, v1
if v2 > v3: v2, v3 = v3, v2
if v1 > v2: v1, v2 = v2, v1
# Compact the 3 values into one 32-bit integer
packedKey = (np.uint32(v1) << 16) | (np.uint32(v2) << 8) | np.uint32(v3)
# Is the sorted tuple (v1,v2,v3) already seen?
if packedKey not in seen:
# Add the value and remember the ordered tuple (va,vb,vc)
packedValue = (np.uint32(va) << 16) | (np.uint32(vb) << 8) | np.uint32(vc)
seen[packedKey] = packedValue
res = np.empty((len(seen), 3), dtype=np.uint8)
for i, packed in enumerate(seen.values()):
res[i, 0] = np.uint8(packed >> 16)
res[i, 1] = np.uint8(packed >> 8)
res[i, 2] = np.uint8(packed)
return res
with_numba(a, b, c)
Benchmark
Here are results on my i5-9600KF processor:
numpy: 122.1 ms (x 1.0)
no_numpy: 49.6 ms (x 2.5)
AlainT's solution: 49.0 ms (x 2.5)
mathfux's solution 34.2 ms (x 3.5)
with_numba: 4.9 ms (x24.9)
The provided solution is about 25 times faster than the slowest implementation and about 7 time faster than the fastest provided implementation so far.
The current Numba code is bounded by the speed of the Numba dictionary operations. The code can be optimized using more low-level tricks. On solution is to replace the dictionary by a packed boolean array (1 item = 1 bit) of size 256**3/8
to track the values already seen (by checking the packedKey
th bit). The packed values can be directly added in res
if the fetched bit is not set. This requires res
to be preallocated to the maximum size or to implement an exponentially growing array (like list
in Python or std::vector
in C ). Another optimization is to sort the list and use a tiling strategy so to improve cache locality. Such optimization are far from being easy to implement but I expect them to drastically speed up the execution.
CodePudding user response:
It is going to be quite hard to get numpy to go as fast as the filtered python iterator because numpy processes whole structures that will inevitably be larger than the result of filtering sets.
Here is the best I could come up with to process the product of arrays in such a way that the result is filtered on unique combinations of distinct values:
def npProductSets(a,b,*others):
if len(a.shape)<2 : a = a[:,None]
if len(b.shape)<2 : b = b[:,None]
left = np.repeat(a,b.shape[0],axis=0)
right = np.tile(b,(a.shape[0],1))
distinct = ~np.any(right==left,axis=1)
prod = np.concatenate((left[distinct],right[distinct]),axis=1)
prod.sort(axis=1)
prod = np.unique(prod,axis=0)
if others:
return npProductSets(prod,*others)
return prod
This npProductSets function filters the expanded arrays as it goes and does it using numpy methods. It still runs slower than the Python generators though (0.078 sec vs 0.054 sec). Numpy is not the ideal tool to combinatorics and set manipulation.
Note that npProductSets returns 50014 items instead of your 58363 because tuple(set(i))
will not filter all permutations of the numbers. The conversion of a set to a tuple does not guarantee the order of elements (so duplicate combinations are included in your output because of permuted items).
CodePudding user response:
You could do it like so:
# create full Cartessian product and keep items in sorted form
arr = np.stack(np.meshgrid(a, b, c), axis=-1).reshape(-1, 3)
arr_sort = np.sort(arr, axis=1)
# apply condition 1: no duplicates between sorted items
u, idx_c1 = np.unique(arr_sort, return_index=True, axis=0)
arr_filter, arr_sort_filter = arr[idx_c1], arr_sort[idx_c1]
# apply condition 2: no items with repeated values between sorted items
idx_c2 = (arr_sort_filter[:,0] != arr_sort_filter[:,1]) & \
(arr_sort_filter[:,1] != arr_sort_filter[:,2])
arr_filter[idx_c2]
>>>
array([[ 1, 2, 0],
[ 1, 3, 0],
[ 1, 4, 0],
...,
[75, 71, 74],
[75, 74, 72],
[75, 74, 73]], dtype=uint8)
It takes 57 ms on my computer vs 77 ms for no_numpy(args?)
and returns 50014 items.
You could later profile this algorithm in order to see what could be optimised. I do it manually but this would be a great idea to find some profiling tools :)
- arr ~0.2 ms
- arr_sort ~1.4ms
- u, idx_c1 ~ 52ms
- remaining part ~2.5ms
So it's easy too see what consumes all the time here. It could be improved significantly using dimensionality reduction. One of the approaches is to replace
u, idx_c1 = np.unique(arr_sort, return_index=True, axis=0)
with
M = max(np.max(a), np.max(b), np.max(c))
idx = np.ravel_multi_index(arr_sort.T, (M 1, M 1, M 1))
np.unique(idx, return_index=True)
It runs only 4.5 ms now and only 9 ms in total! I guess you are capable to speed up this algorithm ~3 times if you optimised these parts:
- use
numba
for faster comparisons inidx_c2
- use
numba
to speed upnp.ravel_multi_index
(manual implementation works faster even innumpy
) - use
numba
ornumpy
version ofnp.bincount
instead ofnp.unique