Home > Net >  Match in Python a LARGE set of (x, y) points to another set with outliers
Match in Python a LARGE set of (x, y) points to another set with outliers

Time:11-21

I have two large sets of (x, y) points and I want to associate in Python each point of one set with "the corresponding point" of the other.

The second set can also contain outliers, i.e. extra noise points, as you can see in this picture, where there are more green dots than red dots:

enter image description here

The association between the two sets of points is not a simple translation, as you can see in this image:

enter image description here

In these two links you can find the red dots and green dots (list of image coordinates with origin in top-left):

https://drive.google.com/file/d/1fptkxEDYbIJ93r_OXJSstDHMfk67DDYo/view?usp=share_link https://drive.google.com/file/d/1Z_ghWIzUZv8sxfawOBoGG3fJz4h_z7Qv/view?usp=share_link

My problem is similar to these two:

Match set of x,y points to another set that is scaled, rotated, translated, and with missing elements

How to align two sets of points (translation rotation) when those sets contain noise?

but I have a large set of points, so the solutions proposed here don't work for my case. My points have a certain structure in rows so it's difficult to compute a Roto-Scale-Translation function because the rows of points get confused with each other.

CodePudding user response:

I found a method which can recover which points correspond to which other points fairly accurately, using two phases. The first phase corrects for affine transformation, and the second phase corrects for nonlinear distortion.

Note: I chose to match red points to green points, rather than the other way around.

Assumptions

The method makes three assumptions:

  1. It knows three or more green points and the matching red points.
  2. The differences between the two are mostly linear.
  3. The nonlinear portion of the difference is locally similar - i.e. if one point has the match offset by (-10, 10), the neighboring point will have a similar offset. This is controlled by max_search_dist.

Code

Start by loading both datasets:

import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import numpy as np
from sklearn.neighbors import NearestNeighbors
from scipy.spatial import KDTree
from collections import Counter

with open('red_points.json', 'rb') as f:
    red_points = json.load(f)
red_points = pd.DataFrame(red_points, columns=list('xy'))
with open('green_points.json', 'rb') as f:
    green_points = json.load(f)
green_points = pd.DataFrame(green_points, columns=list('xy'))

I found it useful to have a function to visualize both datasets:

def plot_two(green, red):
    if isinstance(red, np.ndarray):
        red = pd.DataFrame(red, columns=list('xy'))
    if isinstance(green, np.ndarray):
        green = pd.DataFrame(green, columns=list('xy'))
    both = pd.concat([green.assign(hue='green'), red.assign(hue='red')])
    ax = both.plot.scatter('x', 'y', c='hue', alpha=0.5, s=0.5)
    ax.ticklabel_format(useOffset=False)

Next, pick three points in green, and provide their XY coordinates. Find the corresponding points in red, and provide their XY coordinates.

green_sample = np.array([
    [5221, 12460],
    [2479, 2497],
    [6709, 6303],
])
red_sample = np.array([
    [5274, 12597],
    [2375, 2563],
    [6766, 6406],
])

Next, use those points to find an affine matrix. This affine matrix will cover rotation, translation, scaling and skewing. Since it has six unknowns, you need at least six constraints, or the equation is underdetermined. This is why we needed at least three points earlier.

def add_implicit_ones(matrix):
    b = np.ones((matrix.shape[0], 1))
    return np.concatenate((matrix,b), axis=1)

def transform_points_affine(points, matrix):
    return add_implicit_ones(points) @ matrix

def fit_affine_matrix(red_sample, green_sample):
    red_sample = add_implicit_ones(red_sample)
    X, _, _, _ = np.linalg.lstsq(red_sample, green_sample, rcond=None)
    return X

X = fit_affine_matrix(red_sample, green_sample)
red_points_transformed = transform_points_affine(red_points.values, X)

Now we get to the nonlinear matching step. This is run after red's values are transformed to match green's values. Here's the algorithm:

  1. Start at a red point which has no nonlinear component. One near one of the green_sample points is a good choice for this - the affine step will prioritize getting those points right. Search around this point in a radius for the corresponding green point. Record the difference between the red point and the corresponding green point as "drift."
  2. Look at all of the red neighbors of that red point. Add those to a list to process.
  3. At one of those red neighbors, take the average drift of all neighboring red points. Add that drift to the red point, and search in a radius for a green point.
  4. The difference between the red point and corresponding green point is the drift for this red point.
  5. Add all this point's red neighbors to the list to process, and go back to step 3.
def find_nn_graph(red_points_np):
    nbrs = NearestNeighbors(n_neighbors=8, algorithm='ball_tree').fit(red_points_np)
    _, indicies = nbrs.kneighbors(red_points_np)
    return indicies

def point_search(red_points_np, green_points_np, starting_point, max_search_radius):
    starting_point_idx = (((red_points_np - starting_point)**2).mean(axis=1)).argmin()
    green_tree = KDTree(green_points_np)
    dirty = Counter()
    visited = set()
    indicies = find_nn_graph(red_points_np)
    # Mark starting point as dirty
    dirty[starting_point_idx]  = 1

    match = {}

    drift = np.zeros(red_points_np.shape)
    # NaN = unknown drift
    drift[:] = np.nan
    while len(dirty) > 0:
        point_idx, num_neighbors = dirty.most_common(1)[0]
        neighbors = indicies[point_idx]
        if point_idx != starting_point_idx:
            neighbor_drift_all = drift[neighbors]
            if np.isnan(neighbor_drift_all).all():
                # All neighbors have no drift
                # Unmark as dirty and come back to this one
                del dirty[point_idx]
                continue
            neighbor_drift = np.nanmean(neighbor_drift_all, axis=0)
            assert not np.isnan(neighbor_drift).any(), "No neighbor drift found"
        else:
            neighbor_drift = np.array([0, 0])
        # Find the point in the green set
        red_point = red_points_np[point_idx]
        green_points_idx = green_tree.query_ball_point(red_point   neighbor_drift, r=max_search_radius)

        assert len(green_points_idx) != 0, f"No green point found near {red_point}"
        assert len(green_points_idx) == 1, f"Too many green points found near {red_point}"
        green_point = green_points_np[green_points_idx[0]]
        real_drift = green_point - red_point
        match[point_idx] = green_points_idx[0]

        # Save drift
        drift[point_idx] = real_drift
        # Mark unvisited neighbors as dirty
        if point_idx not in visited:
            neighbors = indicies[point_idx, 1:]
            neighbors = [n for n in neighbors if n not in visited]
            dirty.update(neighbors)
        # Remove this point from dirty
        del dirty[point_idx]
        # Mark this point as visited
        visited.add(point_idx)
    # Check that there are no duplicates
    assert len(set(match.values())) == len(match)
    # Check that every point in red_points_np was matched
    assert len(match) == red_points_np.shape[0]
    return match, drift


# This point is assumed to have a drift of zero
# Pick one of the points which was used for the linear correction
starting_point = green_sample[0]
# Maximum distance that a point can be found from where it is expected
max_search_radius = 10
green_points_np = green_points.values
match, drift = point_search(red_points_transformed, green_points_np, starting_point, max_search_radius)

Next, here's a tool which you can use to audit the quality of the match. This is showing the first thousand matches. Underneath that is a quiver plot, where the arrow points from the red point in the direction of the matching green point. (Note: arrows are not to scale.)

red_idx, green_idx = zip(*match.items())
def show_match_subset(start_idx, length):
    end_idx = start_idx   length
    plot_two(green_points_np[np.array(green_idx)][start_idx:end_idx], red_points_np[np.array(red_idx)][start_idx:end_idx])
    plt.show()
    red_xy = red_points_np[np.array(red_idx)][start_idx:end_idx]
    red_drift_direction = drift[np.array(red_idx)][start_idx:end_idx]
    plt.quiver(red_xy[:, 0], red_xy[:, 1], red_drift_direction[:, 0], red_drift_direction[:, 1])
    
show_subset(0, 1000)

Plots:

plot of red points and matching green direction of match, from red point

Match

Here's a copy of the match I found. It's in JSON format, where the keys represent indexes of points in the red point file, and the values represent indexes of points in the green point file. https://pastebin.com/SBezpstu

  • Related