Home > Enterprise >  Optimization of A* implementation in python
Optimization of A* implementation in python

Time:05-20

To solve problem 83 of project euler I tried to use the A* algorithm. The algorithm works fine for the given problem and I get the correct result. But when I visualized the algorithm I realized that it seems as if the algorithm checked way to many possible nodes. Is it because I didn't implement the algorithm properly or am I missing something else? I tried using two different heuristic functions which you can see in the code below, but the output didn't change much.

If there are any tips to make the code more efficient I would also be thankful for that as I am quite a beginner.

import heapq
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from matplotlib import animation
import numpy as np

class PriorityQueue:
    def __init__(self):
        self.elements = []

    def empty(self):
        return not self.elements

    def put(self, item, priority):
        heapq.heappush(self.elements, (priority, item))

    def get(self):
        return heapq.heappop(self.elements)[1]

class A_star:
    def __init__(self, data, start, end):
        self.data = data
        self.start = start
        self.end = end
        self.a = len(self.data)
        self.b = len(self.data[0])

    def h_matrix(self):
        elements = sorted([self.data[i][j] for j in range(self.b) for i in range(self.a)])
        n = self.a   self.b - 1
        minimum = elements[:n]
        h = []
        for i in range(self.a):
            h_i = []
            for j in range(self.b):
                h_i.append(sum(minimum[:(n-i-j-1)]))
            h.append(h_i)
        return h

    def astar(self):
        h = self.h_matrix()
        open_list = PriorityQueue()
        open_list.put(self.start, 0)
        came_from = {}
        cost_so_far = {}
        came_from[self.start] = None
        cost_so_far[self.start] = self.data[0][0]
        checked = []

        while not open_list.empty():
            current = open_list.get()
            checked.append(current)

            if current == self.end:
                break

            neighbors = [(current[0] x, current[1] y) for x, y in {(-1,0), (0,-1), (1,0), (0,1)}
                if 0 <= current[0] x < self.a and 0 <= current[1] y < self.b]
            for next in neighbors:
                new_cost = cost_so_far[current]   self.data[next[0]][next[1]]
                if next not in cost_so_far or new_cost < cost_so_far[next]:
                    cost_so_far[next] = new_cost
                    priority = new_cost   h[next[0]][next[1]]
                    open_list.put(next, priority)
                    came_from[next] = current

        return came_from, checked, cost_so_far[self.end]

    def reconstruct_path(self):
        paths = self.astar()[0]
        best_path = [self.end]
        while best_path[0] is not None:
            new = paths[best_path[0]]
            best_path.insert(0, new)
        return best_path[1:]

    def minimum(self):
        return self.astar()[2]


if __name__ == "__main__":
    liste = [[131, 673, 234, 103, 18], [201, 96, 342, 965, 150], [630, 803, 746, 422, 111], [537, 699, 497, 121, 956], [805, 732, 524, 37, 331]]
    path = A_star(liste, (0,0), (4,4))
    print(path.astar())
    #print(path.reconstruct_path())
    path.plot_path(speed=200)

Here you can see my visualization for the 80x80 matrix given in the problem. Blue are all the points in checked and red is the optimal path. From my understanding it shouldn't be the case that every point in the matrix is in checked i.e. blue. https://i.stack.imgur.com/LKkdh.png

My initial guess would be that my heuristic function is not good enough. If I choose h=0, which would mean Dijkstra Algorithm the length of my checked list is 6400. Contrary if I use my custom h the length is 6455. But how can I improve the heuristic function for an arbitrary matrix? Thanks for any help.

CodePudding user response:

Let me start at the end of your post:

Marking cells as checked

if I use my custom h the length is 6455.

You should not have a size of checked that exceeds the number of cells. So let me first suggest an improvement for that: instead of using a list, use set, and skip anything popped from the priority queue that is already in the set. The relevant code will then look like this:

        checked = set()

        while not open_list.empty():
            current = open_list.get()
            if current in checked:  # don't repeat the same search
                continue
            checked.add(current)

And if in the end you need the list version of checked, just return it that way:

        return came_from, list(checked), cost_so_far[self.end]

Now to the main question:

Improving heuristic function

From my understanding it shouldn't be the case that every point in the matrix is in checked i.e. blue. My initial guess would be that my heuristic function is not good enough.

That is the right explanation. Combine that with the fact that the given matrix has paths which have a total cost which come quite close, so there is a competition field that involves much of the matrix.

how can I improve the heuristic function for an arbitrary matrix?

One idea is to consider the following. A path must include at least one element from each "forward" diagonal (/). So if we work with the minimum value on each such diagonal and create running sums (backwards -- starting from the target), we'll have a workable value for h.

Here is the code for that idea:

    def h_matrix(self):
        min_diagonals = [float("inf")] * (self.a   self.b - 1)
        # For each diagonal get the minimum cost on that diagonal
        for i, row in enumerate(self.data):
            for j, cost in enumerate(row):
                min_diagonals[i j] = min(min_diagonals[i j], cost)
        # Create a running sum in backward direction
        for i in range(len(min_diagonals) - 2, -1, -1):
            min_diagonals[i]  = min_diagonals[i   1] 
        min_diagonals.append(0)  # Add an entry for allowing the next logic to work
        # These sums are a lower bound for the remaining cost towards the target 
        h = [
            [min_diagonals[i   j   1] for j in range(self.b)] 
            for i in range(self.a)
        ]
        return h

With these improvements, we get these counts:

len(cost_so_far) == 6374
len(checked) == 6339

This represents still a large portion of the matrix, but at least a few cells were left out.

  • Related