Compare commits

..

2 Commits

Author SHA1 Message Date
aeb6f28222 merge upstream 2025-08-05 18:50:38 +00:00
Oliver Hofmann
add4091b5d elektro 2025-06-11 09:14:53 +02:00
5 changed files with 115 additions and 245 deletions

View File

@ -0,0 +1,32 @@
from vorlesung.L08_graphen.graph import Graph, AdjacencyMatrixGraph
from utils.project_dir import get_path
import re
def read_elektro_into_graph(graph: Graph, filename: str):
pattern = re.compile(r'"([^"]+)";"([^"]+)";(\d+)')
with (open(filename, "r") as file):
for line in file:
m = pattern.match(line)
if m:
start_name = m.group(1)
end_name = m.group(2)
cost = int(m.group(3))
graph.insert_vertex(start_name)
graph.insert_vertex(end_name)
graph.connect(start_name, end_name, cost)
graph.connect(end_name, start_name, cost)
if __name__ == "__main__":
graph = AdjacencyMatrixGraph()
read_elektro_into_graph(graph, get_path("data/elektro.txt"))
parents, cost = graph.mst_prim()
print(f"Kosten nach Prim: {cost}")
for node, parent in parents.items():
if parent is not None:
print(f"{node} - {parent}")
edges, cost = graph.mst_kruskal()
print(f"Kosten nach Kruskal: {cost}")
for start_name, end_name, _ in edges:
print(f"{start_name} - {end_name}")

View File

@ -1,243 +0,0 @@
import logging
from graphviz import Digraph
from collections import deque
import heapq
logger = logging.getLogger(__name__)
# logging.basicConfig(level=logging.DEBUG)
import time
def timeMS(func, *args, **kwargs):
startTime = time.perf_counter()
result = func(*args, **kwargs)
endTime = time.perf_counter()
elapsedMS = (endTime - startTime) * 1000 # Convert to milliseconds
print(f"{func.__name__} took {elapsedMS:.2f} ms")
return result
class Graph:
def __init__(self):
self.adjacencyList = {}
def addEdge(self, node1, node2, bidirectional=True):
if node1 not in self.adjacencyList:
self.adjacencyList[node1] = []
if node2 not in self.adjacencyList:
self.adjacencyList[node2] = []
self.adjacencyList[node1].append(node2)
if bidirectional:
self.adjacencyList[node2].append(node1)
def serialize(self):
return self.adjacencyList
def breadthFirstSearch(self, start, goal, edgesGonePassed = None):
if start not in self.adjacencyList or goal not in self.adjacencyList:
return None, None
# Dont want to have a class for this, set suffices
visited = set()
queue = deque([(start, [start], set())]) # (current_node, path, edgesGoneToGetHere)
while queue:
currentNode, path, edgesGone = queue.popleft()
if currentNode == goal:
return path, edgesGone
if currentNode not in visited:
logger.info(f"visiting {currentNode}")
visited.add(currentNode)
for neighbor in self.adjacencyList[currentNode]:
edge = (currentNode, neighbor)
# We already went this Edge. Read Part3 as not allowing this to happen
edgeReverse = (neighbor, currentNode)
if neighbor not in visited and (edgesGonePassed is None or edge not in edgesGonePassed) and (edgesGonePassed is None or edgeReverse not in edgesGonePassed):
# Pythonic way of saying neighbour, path no next clear neighbour
# and union of edgesGone with current edge
queue.append((neighbor, path + [neighbor], edgesGone | {edge}))
return None, None
class SpanningTreeGraph(Graph):
def __init__(self):
super().__init__()
# Store as {(node1,node2): weight} for fast and clean lookup
self.edges = {}
def addWeightedEdge(self, node1, node2, weight, bidirectional=True):
super().addEdge(node1, node2, bidirectional)
self.edges[(node1, node2)] = weight;
if bidirectional:
self.edges[(node2, node1)] = weight;
def prim(self, startNode):
if startNode not in self.adjacencyList:
return None
visited = set()
# PQ -> heapq uses pylist
# Also, why do we need to init distance and parents to inf and None
# Isnt this information stored in the Graph and/or the PQ already?
pqMinHeap = []
minSpanTree = []
visited.add(startNode)
# Interanly use the weights compare and just take the minWeight -> No need for "inf"
# Also store from->to for graph-building
for neighbour in self.adjacencyList[startNode]:
if (tmp := (startNode, neighbour)) in self.edges:
neighbourWeight = self.edges[tmp]
heapq.heappush(pqMinHeap, (neighbourWeight, startNode, neighbour));
while pqMinHeap:
weight, nodeFrom, nodeTo = heapq.heappop(pqMinHeap)
if nodeTo not in visited:
minSpanTree.append((nodeFrom, nodeTo, weight))
visited.add(nodeTo)
for neighbour in self.adjacencyList[nodeTo]:
if neighbour not in visited and (tmp := (nodeTo, neighbour)) in self.edges:
edgeWeight = self.edges[tmp]
heapq.heappush(pqMinHeap, (edgeWeight, nodeTo, neighbour))
return minSpanTree
# https://stackoverflow.com/questions/39713798/need-some-clarification-on-kruskals-and-union-find
# Use UnionByRank and Path-Compression instead of regular union-find for faster runtime and less performance impact
def kruskal(self):
sortedEdges = sorted(self.edges.items(), key=lambda item: item[1])
minSpanTree = []
# UnionByRank+PathCompression assumes each Node has itsself as a parent in the beginning and Rank 0, union then sets new parent as per usual
# Rank tries to pin together subtrees of the same rank to keep tree "clean"
# Then during find (loopdetection) bubble up tree during find (as usual), but pathcompression collapses the "parent" to the root for next loop
parent = {}
rank = {}
# Init only once -> Set for filter
for node in set(node for edge in self.edges.keys() for node in edge):
parent[node] = node
rank[node] = 0
def _find(node):
# Path compression
if parent[node] != node:
parent[node] = _find(parent[node])
return parent[node]
def _union(node1, node2):
# Union by rank
root1 = _find(node1)
root2 = _find(node2)
if root1 != root2:
if rank[root1] > rank[root2]:
parent[root2] = root1
elif rank[root1] < rank[root2]:
parent[root1] = root2
else:
parent[root2] = root1
rank[root1] += 1
for (node1, node2), weight in sortedEdges:
# no Loop
if _find(node1) != _find(node2):
minSpanTree.append((node1, node2, weight))
_union(node1, node2)
return minSpanTree;
def graphvizify(filePath: str, outputFile: str = 'build/hoehleGraph', edges=None):
import re
graph = Digraph()
graph.attr(rankdir='TB')
graph.attr('node', shape='circle', style='filled', fillcolor='lightgray')
graph.attr('edge', arrowsize='0.5')
# Reuse the function to also create our Graph... Waste not want not^^
caveGraph = SpanningTreeGraph();
# Provided Edges -> No Fileparsing, but display the MST
if edges:
# No dupl
addedEdges = set()
for (node1, node2), weight in edges.items():
edge = tuple(sorted((node1, node2))) # Sort nodes for uniqueness (A,B == B,A)
logger.debug(f"added {edge}")
if edge not in addedEdges:
graph.edge(f"\"{node1}\"", f"\"{node2}\"", label=(" " + str(weight)), dir='none')
addedEdges.add(edge)
else:
with open(filePath, 'r') as f:
for line in f:
line = line.strip()
# Cap "<STRING>";"<STRING>";<NUMBER> No whitespaces for sanity
match = re.match(r'"([^"]+)"\s*;\s*"([^"]+)"\s*;\s*(\d+)', line)
if match:
node1, node2, weight = match.groups()
weight = int(weight)
graph.edge(f"\"{node1}\"", f"\"{node2}\"", label=(" "+(str(weight))), dir='none')
logger.debug(f"Added {node1} -> {node2}")
caveGraph.addWeightedEdge(node1, node2, weight)
try:
graph.render(outputFile, view=True)
except Exception as e:
print(f"Could not display graph: {e}\n Trying to save without viewing!")
try:
graph.render(outputFile, view=False)
print(f"Your built map should be available here: {outputFile}.pdf")
except Exception as e:
print(f"Could not save graph file: {e}")
return caveGraph
if __name__ == "__main__":
start = "Höhleneingang"
goal = "Schatzkammer"
for filename in [ "data/elektro.txt" ]:
caveGraph = graphvizify(filename, 'build/hoehleGraphElektro')
mst = caveGraph.prim(start);
mstEdges = {(node1, node2): weight for node1, node2, weight in mst} # type: ignore -> "Pywright, Stop complaining. I know it can happen that we return none, but thats ok
logger.debug(f"Prim: {mstEdges}")
# Reuse the graphvizify to visualize the new MST
graphvizify(filename, 'build/prim', mstEdges)
mstKruskal = caveGraph.kruskal();
mstEdgesKruskal = {(node1, node2): weight for node1, node2, weight in mstKruskal} # type: ignore -> "Pywright, Stop complaining. I know it can happen that we return none, but thats ok
logger.debug(f"Kruskal: {mstEdgesKruskal}")
graphvizify(filename, 'build/kruskal', mstEdgesKruskal)
timeMS(caveGraph.prim, start)
timeMS(caveGraph.kruskal)
print(f"MinPrim: {sum(mstEdges.values())} <+#+> MinKruskal: {sum(mstEdgesKruskal.values())}")
exit(0);
## Old Search for shortest Path, no MST
shortestPath, edgesGoneInitial = caveGraph.breadthFirstSearch(start, goal)
print(shortestPath, edgesGoneInitial)
logger.debug(caveGraph.adjacencyList)
logger.debug(edgesGoneInitial)
if shortestPath:
print(f"Shortest path from {start} to {goal} is:")
print(" -> ".join(shortestPath))
else:
print(f"No path found from {start} to {goal}.")
returnpath, _ = caveGraph.breadthFirstSearch(goal, start, edgesGoneInitial)
if returnpath:
print(f"Shortest path from {goal} to {start} is:")
print(" -> ".join(returnpath))
else:
print("No path back Home found. Good Luck")

View File

@ -2,9 +2,12 @@ from collections import deque
from typing import List from typing import List
from enum import Enum from enum import Enum
import graphviz import graphviz
import math
import heapq
from datetime import datetime from datetime import datetime
from utils.project_dir import get_path from utils.project_dir import get_path
from utils.priority_queue import PriorityQueue from utils.priority_queue import PriorityQueue
from vorlesung.L09_mst.disjoint import DisjointValue
class NodeColor(Enum): class NodeColor(Enum):
@ -205,6 +208,68 @@ class Graph:
relax(vertex, dest, weight) relax(vertex, dest, weight)
return distance_map, predecessor_map return distance_map, predecessor_map
def mst_prim(self, start_name: str = None):
""" Compute the minimum spanning tree of the graph using Prim's algorithm. """
distance_map = {} # maps vertices to their current distance from the spanning tree
parent_map = {} # maps vertices to their predecessor in the spanning tree
Vertex.__lt__ = lambda self, other: distance_map[self] < distance_map[other]
queue = []
if start_name is None:
start_name = self.all_vertices()[0].value
# Initialize the maps
for vertex in self.all_vertices():
distance_map[vertex] = 0 if vertex.value == start_name else math.inf
parent_map[vertex] = None
queue.append(vertex)
heapq.heapify(queue) # Convert the list into a heap
# Process the queue
cost = 0 # The cost of the minimum spanning tree
while len(queue) > 0:
vertex = heapq.heappop(queue)
cost += distance_map[vertex] # Add the cost of the edge to the minimum spanning tree
for (dest, w) in self.get_adjacent_vertices_with_weight(vertex.value):
if dest in queue and distance_map[dest] > w:
# Update the distance and parent maps
queue.remove(dest)
distance_map[dest] = w
parent_map[dest] = vertex
queue.append(dest) # Add the vertex back to the queue
heapq.heapify(queue) # Re-heapify the queue
# Return the distance and predecessor maps
return parent_map, cost
def mst_kruskal(self, start_name: str = None):
""" Compute the minimum spanning tree of the graph using Kruskal's algorithm. """
cost = 0
result = []
edges = self.all_edges()
# Create a disjoint set for each vertex
vertex_map = {v.value: DisjointValue(v) for v in self.all_vertices()}
# Sort the edges by weight
edges.sort(key=lambda edge: edge[2])
# Process the edges
for edge in edges:
start_name, end_name, weight = edge
# Check if the edge creates a cycle
if not vertex_map[start_name].same_set(vertex_map[end_name]):
result.append(edge)
vertex_map[start_name].union(vertex_map[end_name])
cost += weight
return result, cost
class AdjacencyListGraph(Graph): class AdjacencyListGraph(Graph):
"""A graph implemented as an adjacency list.""" """A graph implemented as an adjacency list."""
@ -243,8 +308,6 @@ class AdjacencyListGraph(Graph):
return result return result
class AdjacencyMatrixGraph(Graph): class AdjacencyMatrixGraph(Graph):
"""A graph implemented as an adjacency matrix.""" """A graph implemented as an adjacency matrix."""
def __init__(self): def __init__(self):

View File

View File

@ -0,0 +1,18 @@
class DisjointValue():
def __init__(self, value):
self.value = value
self.parent = None
def canonical(self):
if self.parent:
return self.parent.canonical()
return self
def same_set(self, other):
return self.canonical() == other.canonical()
def union(self, other):
self.canonical().parent = other.canonical()