import logging from graphviz import Digraph from collections import deque import heapq logger = logging.getLogger(__name__) # logging.basicConfig(level=logging.DEBUG) import time def timeMS(func, *args, **kwargs): startTime = time.perf_counter() result = func(*args, **kwargs) endTime = time.perf_counter() elapsedMS = (endTime - startTime) * 1000 # Convert to milliseconds print(f"{func.__name__} took {elapsedMS:.2f} ms") return result class Graph: def __init__(self): self.adjacencyList = {} def addEdge(self, node1, node2, bidirectional=True): if node1 not in self.adjacencyList: self.adjacencyList[node1] = [] if node2 not in self.adjacencyList: self.adjacencyList[node2] = [] self.adjacencyList[node1].append(node2) if bidirectional: self.adjacencyList[node2].append(node1) def serialize(self): return self.adjacencyList def breadthFirstSearch(self, start, goal, edgesGonePassed = None): if start not in self.adjacencyList or goal not in self.adjacencyList: return None, None # Dont want to have a class for this, set suffices visited = set() queue = deque([(start, [start], set())]) # (current_node, path, edgesGoneToGetHere) while queue: currentNode, path, edgesGone = queue.popleft() if currentNode == goal: return path, edgesGone if currentNode not in visited: logger.info(f"visiting {currentNode}") visited.add(currentNode) for neighbor in self.adjacencyList[currentNode]: edge = (currentNode, neighbor) # We already went this Edge. Read Part3 as not allowing this to happen edgeReverse = (neighbor, currentNode) if neighbor not in visited and (edgesGonePassed is None or edge not in edgesGonePassed) and (edgesGonePassed is None or edgeReverse not in edgesGonePassed): # Pythonic way of saying neighbour, path no next clear neighbour # and union of edgesGone with current edge queue.append((neighbor, path + [neighbor], edgesGone | {edge})) return None, None class SpanningTreeGraph(Graph): def __init__(self): super().__init__() # Store as {(node1,node2): weight} for fast and clean lookup self.edges = {} def addWeightedEdge(self, node1, node2, weight, bidirectional=True): super().addEdge(node1, node2, bidirectional) self.edges[(node1, node2)] = weight; if bidirectional: self.edges[(node2, node1)] = weight; def prim(self, startNode): if startNode not in self.adjacencyList: return None visited = set() # PQ -> heapq uses pylist # Also, why do we need to init distance and parents to inf and None # Isnt this information stored in the Graph and/or the PQ already? pqMinHeap = [] minSpanTree = [] visited.add(startNode) # Interanly use the weights compare and just take the minWeight -> No need for "inf" # Also store from->to for graph-building for neighbour in self.adjacencyList[startNode]: if (tmp := (startNode, neighbour)) in self.edges: neighbourWeight = self.edges[tmp] heapq.heappush(pqMinHeap, (neighbourWeight, startNode, neighbour)); while pqMinHeap: weight, nodeFrom, nodeTo = heapq.heappop(pqMinHeap) if nodeTo not in visited: minSpanTree.append((nodeFrom, nodeTo, weight)) visited.add(nodeTo) for neighbour in self.adjacencyList[nodeTo]: if neighbour not in visited and (tmp := (nodeTo, neighbour)) in self.edges: edgeWeight = self.edges[tmp] heapq.heappush(pqMinHeap, (edgeWeight, nodeTo, neighbour)) return minSpanTree # https://stackoverflow.com/questions/39713798/need-some-clarification-on-kruskals-and-union-find # Use UnionByRank and Path-Compression instead of regular union-find for faster runtime and less performance impact def kruskal(self): sortedEdges = sorted(self.edges.items(), key=lambda item: item[1]) minSpanTree = [] # UnionByRank+PathCompression assumes each Node has itsself as a parent in the beginning and Rank 0, union then sets new parent as per usual # Rank tries to pin together subtrees of the same rank to keep tree "clean" # Then during find (loopdetection) bubble up tree during find (as usual), but pathcompression collapses the "parent" to the root for next loop parent = {} rank = {} # Init only once -> Set for filter for node in set(node for edge in self.edges.keys() for node in edge): parent[node] = node rank[node] = 0 def _find(node): # Path compression if parent[node] != node: parent[node] = _find(parent[node]) return parent[node] def _union(node1, node2): # Union by rank root1 = _find(node1) root2 = _find(node2) if root1 != root2: if rank[root1] > rank[root2]: parent[root2] = root1 elif rank[root1] < rank[root2]: parent[root1] = root2 else: parent[root2] = root1 rank[root1] += 1 for (node1, node2), weight in sortedEdges: # no Loop if _find(node1) != _find(node2): minSpanTree.append((node1, node2, weight)) _union(node1, node2) return minSpanTree; def graphvizify(filePath: str, outputFile: str = 'build/hoehleGraph', edges=None): import re graph = Digraph() graph.attr(rankdir='TB') graph.attr('node', shape='circle', style='filled', fillcolor='lightgray') graph.attr('edge', arrowsize='0.5') # Reuse the function to also create our Graph... Waste not want not^^ caveGraph = SpanningTreeGraph(); # Provided Edges -> No Fileparsing, but display the MST if edges: # No dupl addedEdges = set() for (node1, node2), weight in edges.items(): edge = tuple(sorted((node1, node2))) # Sort nodes for uniqueness (A,B == B,A) logger.debug(f"added {edge}") if edge not in addedEdges: graph.edge(f"\"{node1}\"", f"\"{node2}\"", label=(" " + str(weight)), dir='none') addedEdges.add(edge) else: with open(filePath, 'r') as f: for line in f: line = line.strip() # Cap "";""; No whitespaces for sanity match = re.match(r'"([^"]+)"\s*;\s*"([^"]+)"\s*;\s*(\d+)', line) if match: node1, node2, weight = match.groups() weight = int(weight) graph.edge(f"\"{node1}\"", f"\"{node2}\"", label=(" "+(str(weight))), dir='none') logger.debug(f"Added {node1} -> {node2}") caveGraph.addWeightedEdge(node1, node2, weight) try: graph.render(outputFile, view=True) except Exception as e: print(f"Could not display graph: {e}\n Trying to save without viewing!") try: graph.render(outputFile, view=False) print(f"Your built map should be available here: {outputFile}.pdf") except Exception as e: print(f"Could not save graph file: {e}") return caveGraph if __name__ == "__main__": start = "Höhleneingang" goal = "Schatzkammer" for filename in [ "data/elektro.txt" ]: caveGraph = graphvizify(filename, 'build/hoehleGraphElektro') mst = caveGraph.prim(start); mstEdges = {(node1, node2): weight for node1, node2, weight in mst} # type: ignore -> "Pywright, Stop complaining. I know it can happen that we return none, but thats ok logger.debug(f"Prim: {mstEdges}") # Reuse the graphvizify to visualize the new MST graphvizify(filename, 'build/prim', mstEdges) mstKruskal = caveGraph.kruskal(); mstEdgesKruskal = {(node1, node2): weight for node1, node2, weight in mstKruskal} # type: ignore -> "Pywright, Stop complaining. I know it can happen that we return none, but thats ok logger.debug(f"Kruskal: {mstEdgesKruskal}") graphvizify(filename, 'build/kruskal', mstEdgesKruskal) timeMS(caveGraph.prim, start) timeMS(caveGraph.kruskal) print(f"MinPrim: {sum(mstEdges.values())} <+#+> MinKruskal: {sum(mstEdgesKruskal.values())}") exit(0); ## Old Search for shortest Path, no MST shortestPath, edgesGoneInitial = caveGraph.breadthFirstSearch(start, goal) print(shortestPath, edgesGoneInitial) logger.debug(caveGraph.adjacencyList) logger.debug(edgesGoneInitial) if shortestPath: print(f"Shortest path from {start} to {goal} is:") print(" -> ".join(shortestPath)) else: print(f"No path found from {start} to {goal}.") returnpath, _ = caveGraph.breadthFirstSearch(goal, start, edgesGoneInitial) if returnpath: print(f"Shortest path from {goal} to {start} is:") print(" -> ".join(returnpath)) else: print("No path back Home found. Good Luck")