diff --git a/schoeffelbe/pr10.py b/schoeffelbe/pr10.py new file mode 100644 index 0000000..3a9a197 --- /dev/null +++ b/schoeffelbe/pr10.py @@ -0,0 +1,241 @@ +import logging +from graphviz import Digraph +from collections import deque +import heapq + +logger = logging.getLogger(__name__) +# logging.basicConfig(level=logging.DEBUG) + +import time + +def timeMS(func, *args, **kwargs): + startTime = time.perf_counter() + result = func(*args, **kwargs) + endTime = time.perf_counter() + elapsedMS = (endTime - startTime) * 1000 # Convert to milliseconds + print(f"{func.__name__} took {elapsedMS:.2f} ms") + return result + +class Graph: + def __init__(self): + self.adjacencyList = {} + + def addEdge(self, node1, node2, bidirectional=True): + if node1 not in self.adjacencyList: + self.adjacencyList[node1] = [] + if node2 not in self.adjacencyList: + self.adjacencyList[node2] = [] + self.adjacencyList[node1].append(node2) + if bidirectional: + self.adjacencyList[node2].append(node1) + + def serialize(self): + return self.adjacencyList + + def breadthFirstSearch(self, start, goal, edgesGonePassed = None): + if start not in self.adjacencyList or goal not in self.adjacencyList: + return None, None + + # Dont want to have a class for this, set suffices + visited = set() + queue = deque([(start, [start], set())]) # (current_node, path, edgesGoneToGetHere) + + while queue: + currentNode, path, edgesGone = queue.popleft() + + if currentNode == goal: + return path, edgesGone + + if currentNode not in visited: + logger.info(f"visiting {currentNode}") + visited.add(currentNode) + for neighbor in self.adjacencyList[currentNode]: + edge = (currentNode, neighbor) + # We already went this Edge. Read Part3 as not allowing this to happen + edgeReverse = (neighbor, currentNode) + if neighbor not in visited and (edgesGonePassed is None or edge not in edgesGonePassed) and (edgesGonePassed is None or edgeReverse not in edgesGonePassed): + # Pythonic way of saying neighbour, path no next clear neighbour + # and union of edgesGone with current edge + queue.append((neighbor, path + [neighbor], edgesGone | {edge})) + + return None, None + +class SpanningTreeGraph(Graph): + def __init__(self): + super().__init__() + # Store as {(node1,node2): weight} for fast and clean lookup + self.edges = {} + + def addWeightedEdge(self, node1, node2, weight, bidirectional=True): + super().addEdge(node1, node2, bidirectional) + self.edges[(node1, node2)] = weight; + if bidirectional: + self.edges[(node2, node1)] = weight; + + def prim(self, startNode): + if startNode not in self.adjacencyList: + return None + + visited = set() + # PQ -> heapq uses pylist + # Also, why do we need to init distance and parents to inf and None + # Isnt this information stored in the Graph and/or the PQ already? + pqMinHeap = [] + minSpanTree = [] + + visited.add(startNode) + # Interanly use the weights compare and just take the minWeight -> No need for "inf" + # Also store from->to for graph-building + for neighbour in self.adjacencyList[startNode]: + if (tmp := (startNode, neighbour)) in self.edges: + neighbourWeight = self.edges[tmp] + heapq.heappush(pqMinHeap, (neighbourWeight, startNode, neighbour)); + + while pqMinHeap: + weight, nodeFrom, nodeTo = heapq.heappop(pqMinHeap) + + if nodeTo not in visited: + minSpanTree.append((nodeFrom, nodeTo, weight)) + visited.add(nodeTo) + + for neighbour in self.adjacencyList[nodeTo]: + if neighbour not in visited and (tmp := (nodeTo, neighbour)) in self.edges: + edgeWeight = self.edges[tmp] + heapq.heappush(pqMinHeap, (edgeWeight, nodeTo, neighbour)) + + return minSpanTree + + # https://stackoverflow.com/questions/39713798/need-some-clarification-on-kruskals-and-union-find + # Use UnionByRank and Path-Compression instead of regular union-find for faster runtime and less performance impact + def kruskal(self): + sortedEdges = sorted(self.edges.items(), key=lambda item: item[1]) + + minSpanTree = [] + + # UnionByRank+PathCompression assumes each Node has itsself as a parent in the beginning and Rank 0, union then sets new parent as per usual + # Rank tries to pin together subtrees of the same rank to keep tree "clean" + # Then during find (loopdetection) bubble up tree during find (as usual), but pathcompression collapses the "parent" to the root for next loop + parent = {} + rank = {} + + # Init only once -> Set for filter + for node in set(node for edge in self.edges.keys() for node in edge): + parent[node] = node + rank[node] = 0 + + def _find(node): + # Path compression + if parent[node] != node: + parent[node] = _find(parent[node]) + return parent[node] + + def _union(node1, node2): + # Union by rank + root1 = _find(node1) + root2 = _find(node2) + + if root1 != root2: + if rank[root1] > rank[root2]: + parent[root2] = root1 + elif rank[root1] < rank[root2]: + parent[root1] = root2 + else: + parent[root2] = root1 + rank[root1] += 1 + + for (node1, node2), weight in sortedEdges: + # no Loop + if _find(node1) != _find(node2): + minSpanTree.append((node1, node2, weight)) + _union(node1, node2) + + return minSpanTree; + + + +def graphvizify(filePath: str, outputFile: str = 'build/hoehleGraph', edges=None): + import re + graph = Digraph() + graph.attr(rankdir='TB') + graph.attr('node', shape='circle', style='filled', fillcolor='lightgray') + graph.attr('edge', arrowsize='0.5') + + # Reuse the function to also create our Graph... Waste not want not^^ + caveGraph = SpanningTreeGraph(); + + # Provided Edges -> No Fileparsing, but display the MST + if edges: + # No dupl + addedEdges = set() + for (node1, node2), weight in edges.items(): + edge = tuple(sorted((node1, node2))) # Sort nodes for uniqueness (A,B == B,A) + logger.debug(f"added {edge}") + if edge not in addedEdges: + graph.edge(f"\"{node1}\"", f"\"{node2}\"", label=(" " + str(weight)), dir='none') + addedEdges.add(edge) + else: + with open(filePath, 'r') as f: + for line in f: + line = line.strip() + # Cap "";""; + match = re.match(r'"([^"]+)"\s*;\s*"([^"]+)"\s*;\s*(\d+)', line) + if match: + node1, node2, weight = match.groups() + weight = int(weight) + graph.edge(f"\"{node1}\"", f"\"{node2}\"", label=(" "+(str(weight))), dir='none') + logger.debug(f"Added {node1} -> {node2}") + caveGraph.addWeightedEdge(node1, node2, weight) + + try: + graph.render(outputFile, view=True) + except Exception as e: + print(f"Could not display graph: {e}\n Trying to save without viewing!") + try: + graph.render(outputFile, view=False) + print(f"Your built map should be available here: {outputFile}.pdf") + except Exception as e: + print(f"Could not save graph file: {e}") + + return caveGraph + +if __name__ == "__main__": + start = "Höhleneingang" + goal = "Schatzkammer" + + for filename in [ "data/elektro.txt" ]: + caveGraph = graphvizify(filename, 'build/hoehleGraphElektro') + mst = caveGraph.prim(start); + mstEdges = {(node1, node2): weight for node1, node2, weight in mst} # type: ignore -> "Pywright, Stop complaining. I know it can happen that we return none, but thats ok + logger.debug(f"Prim: {mstEdges}") + # Reuse the graphvizify to visualize the new MST + graphvizify(filename, 'build/prim', mstEdges) + + mstKruskal = caveGraph.kruskal(); + mstEdgesKruskal = {(node1, node2): weight for node1, node2, weight in mstKruskal} # type: ignore -> "Pywright, Stop complaining. I know it can happen that we return none, but thats ok + logger.debug(f"Kruskal: {mstEdgesKruskal}") + graphvizify(filename, 'build/kruskal', mstEdgesKruskal) + + timeMS(caveGraph.prim, start) + timeMS(caveGraph.kruskal) + + exit(0); + + ## Old Search for shortest Path, no MST + shortestPath, edgesGoneInitial = caveGraph.breadthFirstSearch(start, goal) + print(shortestPath, edgesGoneInitial) + logger.debug(caveGraph.adjacencyList) + logger.debug(edgesGoneInitial) + + if shortestPath: + print(f"Shortest path from {start} to {goal} is:") + print(" -> ".join(shortestPath)) + else: + print(f"No path found from {start} to {goal}.") + + returnpath, _ = caveGraph.breadthFirstSearch(goal, start, edgesGoneInitial) + + if returnpath: + print(f"Shortest path from {goal} to {start} is:") + print(" -> ".join(returnpath)) + else: + print("No path back Home found. Good Luck")