Home > Software design >  Deduplication/merging of mutable data in Python
Deduplication/merging of mutable data in Python

Time:10-25

High-level view of the problem

I have X sources that contain info about assets (hostname, IPs, MACs, os, etc.) in our environment. The sources contain anywhere from 1500 to 150k entries (at least the ones I use now). My script is supposed to query each of them, gather that data, deduplicate it by merging info about the same assets from different sources, and return unified list of all entries. My current implementation does work, but it's slow for bigger datasets. I'm curious if there is better way to accomplish what I'm trying to do.

Universal problem
Deduplication of data by merging similar entries with the caveat that merging two assets might change whether the resulting asset will be similar to the third asset that was similar to the first two before merging.
Example:
~ similarity, merging
(before) A ~ B ~ C
(after) (A B) ~ C or (A B) !~ C

I tried looking for people having the same issue, I only found What is an elegant way to remove duplicate mutable objects in a list in Python?, but it didn't include merging of data which is crucial in my case.

The classes used

Simplified for ease of reading and understanding with unneeded parts removed - general functionality is intact.

class Entry:

    def __init__(self, source: List[str], mac: List[str] = [], ip: List[str] = [], hostname: List[str] = [], os: OS = OS.UNKNOWN, details: dict = {}):
        # SO: Sorting and sanitization removed for simplicity
        self.source = source
        self.mac = mac
        self.ip = ip
        self.hostname = hostname
        self.os = os
        self.details = details

    def __eq__(self, other):
        if isinstance(other, Entry):
            return (self.source == other.source and
                    self.os == other.os and
                    self.hostname == other.hostname and
                    self.mac == other.mac and
                    self.ip == other.ip)
        return NotImplemented

    def is_similar(self, other) -> bool:
        def same_entry(l1: list, l2: list) -> bool:
            return not set(l1).isdisjoint(l2)

        if isinstance(other, Entry):
            if self.os == OS.UNKNOWN or other.os == OS.UNKNOWN or self.os == other.os:
                empty_hostnames = self.hostname == [] or other.hostname == []
                empty_macs = self.mac == [] or other.mac == []

                return (same_entry(self.hostname, other.hostname) or
                        (empty_hostnames and same_entry(self.mac, other.mac)) or
                        (empty_hostnames and empty_macs and same_entry(self.ip, other.ip)))

        return False

    def merge(self, other: 'Entry'):
        self.source = _merge_lists(self.source, other.source)
        self.hostname = _merge_lists(self.hostname, other.hostname)
        self.mac = _merge_lists(self.mac, other.mac)
        self.ip = _merge_lists(self.ip, other.ip)
        self.os = self.os if self.os != OS.UNKNOWN else other.os
        self.details = _merge_dicts(self.details, other.details)

    def representation(self) -> str:
        # Might be useful if anyone wishes to run the code
        return f'<Entry from {self.source}: hostname={self.hostname}, MAC={self.mac}, IP={self.ip}, OS={self.os.value}, details={self.details}>'

def _merge_lists(l1: list, l2: list):
    return list(set(l1) | set(l2))


def _merge_dicts(d1: dict, d2: dict):
    """
    Merge two dicts without overwriting any data.
    """
    # If either is empty, return the other one
    if not d1:
        return d2
    if not d2:
        return d1
    if d1 == d2:
        return d1

    result = d1
    for k, v in d2.items():
        if k in result:
            result[k   '_'] = v
        else:
            result[k] = v

    return result
class OS(Enum):
    '''
    Enum specifying the operating system of the asset.
    '''
    UNKNOWN = 'Unknown'
    WINDOWS = 'Windows'
    LINUX = 'Linux'
    MACOS = 'MacOS'

Algorithms

Eeach algorithm take a list of lists of entries from different sources, eq: entries = [[entries from source A], [entries from source B], ..., [entries from source Z]]

Main deduplication function

It's the main function used in each algorithm. It takes list of entries from 2 different sources and combines that into list containing assets with information merged if needed.


It's probably the part I need help the most. It's the only way I could think of. Because of that, I focused on how to run this function multiple times faster, but making this one faster would be the best in terms of reducing runtime.


def deduplicate(en1: List[Entry], en2: List[Entry]) -> List[Entry]:
    """
    Deduplicates entries from provided lists by merging similar entries.
    Entries in the lists are supposed to be already deduplicated.
    """
    # If either is empty, return the other one
    if not en1:
        return en2
    if not en2:
        return en1

    result = []

    # Iterate over longer and check for similar in shorter
    if len(en2) > len(en1):
        en1, en2 = en2, en1

    for e in en1:
        # walrus operator in Python 3.8 or newer
        while (similar := next((y for y in en2 if y.is_similar(e)), None)) is not None:
            e.merge(similar)
            en2.remove(similar)
            del similar
        result.append(e)
    result.extend(en2)

    return result

A reason why normal deduplication (eg. using sets) isn't applicable here is because of merging one entry with another new entries might become similar, eg.:

In [2]: e1 = Entry(['SRC_A'], [], ['1.1.1.1'], [], OS.UNKNOWN)
In [3]: e2 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], ['1.1.1.1'], [], OS.UNKNOWN)
In [4]: e3 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], [], [], OS.UNKNOWN)
In [5]: e1.is_similar(e2)
Out[5]: True
In [6]: e1.is_similar(e3) # at first it's not similar
Out[6]: False
In [7]: e1.merge(e2)
In [8]: e1.is_similar(e3) # but after merging it is
Out[8]: True

1st approach - sequential

My first idea was the simplest one, just simple recursion.

def dedup_multiple(lists: List[List[Entry]]) -> List[Entry]:
    """Deduplication helper allowing for providing more than 2 sources."""
    if len(lists) == 1:
        return lists[0]

    return deduplicate(lists[0], dedup_multiple(lists[1:]))

2nd approach - multithreading using Pool

That's the approach I'm using at the moment. So far it's the fastest one and fairly simple.

def async_dedup(lists: List[List[Entry]]) -> List[Entry]:
    """Asynchronous deduplication helper allowing for providing more than 2 sources."""
    with mp.Pool() as pool:
        while len(lists) > 1:
            if len(lists) % 2 == 1:
                lists.append([])
            data = [(lists[i], lists[i 1]) for i in range(0, len(lists), 2)]
            lists = pool.map_async(_internal_deduplication, data).get()
        return lists[0]

def _internal_deduplication(en):
    return deduplicate(*en)

But I realized really fast that if one task takes much longer than the rest (for example because deduplicating the biggest source), everything else wait instead of working.

3rd approach - multithreading using Queue and Process

As I was trying to speed up 2nd approach I came across How to use python multiprocessing pool in continuous loop and Filling a queue and managing multiprocessing in python, and I came up with the following solution.

def async_dedup2(lists: List[List[Entry]]) -> List[Entry]:
    tasks_number = min(os.cpu_count(), len(lists) // 2)
    args = lists[:tasks_number]

    with mp.Manager() as manager:
        queue = manager.Queue()

        for l in lists[tasks_number:]:
            queue.put(l)

        processes = []
        for arg in args:
            proc = mp.Process(target=test, args=(queue, arg, ))
            proc.start()
            processes.append(proc)

        for proc in processes:
            proc.join()

        return queue.get()


def test(queue: mp.Queue, arg: List[Entry]):
    while not queue.empty():
        try:
            arg2: List[Entry] = queue.get()
        except Empty:
            continue
        arg = deduplicate(arg, arg2)

    queue.put(arg)

I thought it would be the best solution as there wouldn't be a moment when a data isn't processed if possible, but after testing it was almost always slightly slower than 2nd approach.

Runtime comparison

Source A    1510
Source B    1509
Source C    5000
Source D    4460
Source E    5000
Source F    2084

Deduplicating.....
SYNC   - Execution time: 188.6127771000 - Count: 13540
ASYNC  - Execution time: 68.249583 - Count: 13532
ASYNC2 - Execution time: 69.416046 - Count: 13532
Source A    1510
Source B    1509
Source C    11821
Source D    13871
Source E    5001
Source F    2333

Deduplicating.....
ASYNC  - Execution time: 424.405793 - Count: 26229
ASYNC2 - Execution time: 522.697551 - Count: 26405

CodePudding user response:

Summary: we define two sketch functions f and g from entries to sets of “sketches” such that two entries e and e′ are similar if and only if f(e) ∩ g(e′) ≠ ∅. Then we can identify merges efficiently (see the algorithm at the end).

I’m actually going to define four sketch functions, fos, faddr, gos, and gaddr, from which we construct

  • f(e) = {(x, y) | x ∈ fos(e), y ∈ faddr(e)}
  • g(e) = {(x, y) | x ∈ gos(e), y ∈ gaddr(e)}.

fos and gos are the simpler of the four. fos(e) includes

  • (1, e.os), if e.os is known
  • (2,), if e.os is known
  • (3,), if e.os is unknown.

gos(e) includes

  • (1, e.os), if e.os is known
  • (2,), if e.os is unknown
  • (3,).

faddr and gaddr are more complicated because there are prioritized attributes, and they can have multiple values. Nevertheless, the same trick can be made to work. faddr(e) includes

  • (1, h) for each h in e.hostname
  • (2, m) for each m in e.mac, if e.hostname is nonempty
  • (3, m) for each m in e.mac, if e.hostname is empty
  • (4, i) for each i in e.ip, if e.hostname and e.mac are nonempty
  • (5, i) for each i in e.ip, if e.hostname is empty and e.mac is nonempty
  • (6, i) for each i in e.ip, if e.hostname is nonempty and e.mac is empty
  • (7, i) for each i in e.ip, if e.hostname and e.mac are empty.

gaddr(e) includes

  • (1, h) for each h in e.hostname
  • (2, m) for each m in e.mac, if e.hostname is empty
  • (3, m) for each m in e.mac
  • (4, i) for each i in e.ip, if e.hostname is empty and e.mac is empty
  • (5, i) for each i in e.ip, if e.mac is empty
  • (6, i) for each i in e.ip, if e.hostname is empty
  • (7, i) for each i in e.ip.

The rest of the algorithm is as follows.

  • Initialize a defaultdict(list) mapping a sketch to a list of entry identifiers.

  • For each entry, for each of the entry’s f-sketches, add the entry’s identifier to the appropriate list in the defaultdict.

  • Initialize a set of edges.

  • For each entry, for each of the entry’s g-sketches, look up the g-sketch in the defaultdict and add an edge from the entry’s identifiers to each of the other identifiers in the list.

Now that we have a set of edges, we run into the problem that @btilly noted. My first instinct as a computer scientist is to find connected components, but of course, merging two entries may cause some incident edges to disappear. Instead you can use the edges as candidates for merging, and repeat until the algorithm above returns no edges.

import collections
import itertools

Entry = collections.namedtuple("Entry", ("os", "hostname", "mac", "ip"))

UNKNOWN = "UNKNOWN"
WINDOWS = "WINDOWS"
LINUX = "LINUX"


def f_os(e):
    if e.os != UNKNOWN:
        yield (1, e.os)
    if e.os != UNKNOWN:
        yield (2,)
    if e.os == UNKNOWN:
        yield (3,)


def g_os(e):
    if e.os != UNKNOWN:
        yield (1, e.os)
    if e.os == UNKNOWN:
        yield (2,)
    yield (3,)


def f_addr(e):
    for h in e.hostname:
        yield (1, h)
    if e.hostname:
        for m in e.mac:
            yield (2, m)
    if not e.hostname:
        for m in e.mac:
            yield (3, m)
    if e.hostname and e.mac:
        for i in e.ip:
            yield (4, i)
    if not e.hostname and e.mac:
        for i in e.ip:
            yield (5, i)
    if e.hostname and not e.mac:
        for i in e.ip:
            yield (6, i)
    if not e.hostname and not e.mac:
        for i in e.ip:
            yield (7, i)


def g_addr(e):
    for h in e.hostname:
        yield (1, h)
    if not e.hostname:
        for m in e.mac:
            yield (2, m)
    for m in e.mac:
        yield (3, m)
    if not e.hostname and not e.mac:
        for i in e.ip:
            yield (4, i)
    if not e.mac:
        for i in e.ip:
            yield (5, i)
    if not e.hostname:
        for i in e.ip:
            yield (6, i)
    for i in e.ip:
        yield (7, i)


def f(e):
    return set(itertools.product(f_os(e), f_addr(e)))


def g(e):
    return set(itertools.product(g_os(e), g_addr(e)))


def is_similar(e, e_prime):
    return not f(e).isdisjoint(g(e_prime))


# Begin testing code for is_similar


def original_is_similar(e, e_prime):
    if e.os != UNKNOWN and e_prime.os != UNKNOWN and e.os != e_prime.os:
        return False
    if e.hostname and e_prime.hostname:
        return not set(e.hostname).isdisjoint(set(e_prime.hostname))
    if e.mac and e_prime.mac:
        return not set(e.mac).isdisjoint(set(e_prime.mac))
    return not set(e.ip).isdisjoint(set(e_prime.ip))


import random


def random_os():
    return random.choice([UNKNOWN, WINDOWS, LINUX])


def random_names(prefix):
    return [
        "{}{}".format(prefix, random.randrange(10)) for n in range(random.randrange(3))
    ]


def random_entry():
    return Entry(random_os(), random_names("H"), random_names("M"), random_names("I"))


def test_is_similar():
    print("Testing is_similar()")
    for rep in range(100000):
        e = random_entry()
        e_prime = random_entry()
        got = is_similar(e, e_prime)
        expected = original_is_similar(e, e_prime)
        if got != expected:
            print(e)
            print(e_prime)
            print("got", got)
            print("expected", expected)
            break


if __name__ == "__main__":
    test_is_similar()


# End testing code


def find_edges(entries):
    entries = list(entries)
    posting_lists = collections.defaultdict(list)
    for i, e in enumerate(entries):
        for sketch in f(e):
            posting_lists[sketch].append(i)
    edges = set()
    for i, e in enumerate(entries):
        for sketch in g(e):
            for j in posting_lists[sketch]:
                if i < j:
                    edges.add((i, j))
    return edges


# Begin testing code for find_edges


def test_find_edges():
    print("Testing find_edges()")
    entries = [random_entry() for i in range(1000)]
    got = find_edges(entries)
    expected = {
        (i, j)
        for (i, e) in enumerate(entries)
        for (j, e_prime) in enumerate(entries)
        if i < j and is_similar(e, e_prime)
    }
    print(len(expected))
    assert got == expected


if __name__ == "__main__":
    test_find_edges()
    find_edges([random_entry() for i in range(10000)])

# End testing code for find_edges
  • Related