Compare commits

...

2 Commits

243
schoeffelbe/pr10.py Normal file
View File

@ -0,0 +1,243 @@
import logging
from graphviz import Digraph
from collections import deque
import heapq
logger = logging.getLogger(__name__)
# logging.basicConfig(level=logging.DEBUG)
import time
def timeMS(func, *args, **kwargs):
startTime = time.perf_counter()
result = func(*args, **kwargs)
endTime = time.perf_counter()
elapsedMS = (endTime - startTime) * 1000 # Convert to milliseconds
print(f"{func.__name__} took {elapsedMS:.2f} ms")
return result
class Graph:
def __init__(self):
self.adjacencyList = {}
def addEdge(self, node1, node2, bidirectional=True):
if node1 not in self.adjacencyList:
self.adjacencyList[node1] = []
if node2 not in self.adjacencyList:
self.adjacencyList[node2] = []
self.adjacencyList[node1].append(node2)
if bidirectional:
self.adjacencyList[node2].append(node1)
def serialize(self):
return self.adjacencyList
def breadthFirstSearch(self, start, goal, edgesGonePassed = None):
if start not in self.adjacencyList or goal not in self.adjacencyList:
return None, None
# Dont want to have a class for this, set suffices
visited = set()
queue = deque([(start, [start], set())]) # (current_node, path, edgesGoneToGetHere)
while queue:
currentNode, path, edgesGone = queue.popleft()
if currentNode == goal:
return path, edgesGone
if currentNode not in visited:
logger.info(f"visiting {currentNode}")
visited.add(currentNode)
for neighbor in self.adjacencyList[currentNode]:
edge = (currentNode, neighbor)
# We already went this Edge. Read Part3 as not allowing this to happen
edgeReverse = (neighbor, currentNode)
if neighbor not in visited and (edgesGonePassed is None or edge not in edgesGonePassed) and (edgesGonePassed is None or edgeReverse not in edgesGonePassed):
# Pythonic way of saying neighbour, path no next clear neighbour
# and union of edgesGone with current edge
queue.append((neighbor, path + [neighbor], edgesGone | {edge}))
return None, None
class SpanningTreeGraph(Graph):
def __init__(self):
super().__init__()
# Store as {(node1,node2): weight} for fast and clean lookup
self.edges = {}
def addWeightedEdge(self, node1, node2, weight, bidirectional=True):
super().addEdge(node1, node2, bidirectional)
self.edges[(node1, node2)] = weight;
if bidirectional:
self.edges[(node2, node1)] = weight;
def prim(self, startNode):
if startNode not in self.adjacencyList:
return None
visited = set()
# PQ -> heapq uses pylist
# Also, why do we need to init distance and parents to inf and None
# Isnt this information stored in the Graph and/or the PQ already?
pqMinHeap = []
minSpanTree = []
visited.add(startNode)
# Interanly use the weights compare and just take the minWeight -> No need for "inf"
# Also store from->to for graph-building
for neighbour in self.adjacencyList[startNode]:
if (tmp := (startNode, neighbour)) in self.edges:
neighbourWeight = self.edges[tmp]
heapq.heappush(pqMinHeap, (neighbourWeight, startNode, neighbour));
while pqMinHeap:
weight, nodeFrom, nodeTo = heapq.heappop(pqMinHeap)
if nodeTo not in visited:
minSpanTree.append((nodeFrom, nodeTo, weight))
visited.add(nodeTo)
for neighbour in self.adjacencyList[nodeTo]:
if neighbour not in visited and (tmp := (nodeTo, neighbour)) in self.edges:
edgeWeight = self.edges[tmp]
heapq.heappush(pqMinHeap, (edgeWeight, nodeTo, neighbour))
return minSpanTree
# https://stackoverflow.com/questions/39713798/need-some-clarification-on-kruskals-and-union-find
# Use UnionByRank and Path-Compression instead of regular union-find for faster runtime and less performance impact
def kruskal(self):
sortedEdges = sorted(self.edges.items(), key=lambda item: item[1])
minSpanTree = []
# UnionByRank+PathCompression assumes each Node has itsself as a parent in the beginning and Rank 0, union then sets new parent as per usual
# Rank tries to pin together subtrees of the same rank to keep tree "clean"
# Then during find (loopdetection) bubble up tree during find (as usual), but pathcompression collapses the "parent" to the root for next loop
parent = {}
rank = {}
# Init only once -> Set for filter
for node in set(node for edge in self.edges.keys() for node in edge):
parent[node] = node
rank[node] = 0
def _find(node):
# Path compression
if parent[node] != node:
parent[node] = _find(parent[node])
return parent[node]
def _union(node1, node2):
# Union by rank
root1 = _find(node1)
root2 = _find(node2)
if root1 != root2:
if rank[root1] > rank[root2]:
parent[root2] = root1
elif rank[root1] < rank[root2]:
parent[root1] = root2
else:
parent[root2] = root1
rank[root1] += 1
for (node1, node2), weight in sortedEdges:
# no Loop
if _find(node1) != _find(node2):
minSpanTree.append((node1, node2, weight))
_union(node1, node2)
return minSpanTree;
def graphvizify(filePath: str, outputFile: str = 'build/hoehleGraph', edges=None):
import re
graph = Digraph()
graph.attr(rankdir='TB')
graph.attr('node', shape='circle', style='filled', fillcolor='lightgray')
graph.attr('edge', arrowsize='0.5')
# Reuse the function to also create our Graph... Waste not want not^^
caveGraph = SpanningTreeGraph();
# Provided Edges -> No Fileparsing, but display the MST
if edges:
# No dupl
addedEdges = set()
for (node1, node2), weight in edges.items():
edge = tuple(sorted((node1, node2))) # Sort nodes for uniqueness (A,B == B,A)
logger.debug(f"added {edge}")
if edge not in addedEdges:
graph.edge(f"\"{node1}\"", f"\"{node2}\"", label=(" " + str(weight)), dir='none')
addedEdges.add(edge)
else:
with open(filePath, 'r') as f:
for line in f:
line = line.strip()
# Cap "<STRING>";"<STRING>";<NUMBER> No whitespaces for sanity
match = re.match(r'"([^"]+)"\s*;\s*"([^"]+)"\s*;\s*(\d+)', line)
if match:
node1, node2, weight = match.groups()
weight = int(weight)
graph.edge(f"\"{node1}\"", f"\"{node2}\"", label=(" "+(str(weight))), dir='none')
logger.debug(f"Added {node1} -> {node2}")
caveGraph.addWeightedEdge(node1, node2, weight)
try:
graph.render(outputFile, view=True)
except Exception as e:
print(f"Could not display graph: {e}\n Trying to save without viewing!")
try:
graph.render(outputFile, view=False)
print(f"Your built map should be available here: {outputFile}.pdf")
except Exception as e:
print(f"Could not save graph file: {e}")
return caveGraph
if __name__ == "__main__":
start = "Höhleneingang"
goal = "Schatzkammer"
for filename in [ "data/elektro.txt" ]:
caveGraph = graphvizify(filename, 'build/hoehleGraphElektro')
mst = caveGraph.prim(start);
mstEdges = {(node1, node2): weight for node1, node2, weight in mst} # type: ignore -> "Pywright, Stop complaining. I know it can happen that we return none, but thats ok
logger.debug(f"Prim: {mstEdges}")
# Reuse the graphvizify to visualize the new MST
graphvizify(filename, 'build/prim', mstEdges)
mstKruskal = caveGraph.kruskal();
mstEdgesKruskal = {(node1, node2): weight for node1, node2, weight in mstKruskal} # type: ignore -> "Pywright, Stop complaining. I know it can happen that we return none, but thats ok
logger.debug(f"Kruskal: {mstEdgesKruskal}")
graphvizify(filename, 'build/kruskal', mstEdgesKruskal)
timeMS(caveGraph.prim, start)
timeMS(caveGraph.kruskal)
print(f"MinPrim: {sum(mstEdges.values())} <+#+> MinKruskal: {sum(mstEdgesKruskal.values())}")
exit(0);
## Old Search for shortest Path, no MST
shortestPath, edgesGoneInitial = caveGraph.breadthFirstSearch(start, goal)
print(shortestPath, edgesGoneInitial)
logger.debug(caveGraph.adjacencyList)
logger.debug(edgesGoneInitial)
if shortestPath:
print(f"Shortest path from {start} to {goal} is:")
print(" -> ".join(shortestPath))
else:
print(f"No path found from {start} to {goal}.")
returnpath, _ = caveGraph.breadthFirstSearch(goal, start, edgesGoneInitial)
if returnpath:
print(f"Shortest path from {goal} to {start} is:")
print(" -> ".join(returnpath))
else:
print("No path back Home found. Good Luck")