forked from hofmannol/AlgoDatSoSe25
Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
aeb6f28222 | |||
![]() |
add4091b5d |
32
praktika/10_electro/electro.py
Normal file
32
praktika/10_electro/electro.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
from vorlesung.L08_graphen.graph import Graph, AdjacencyMatrixGraph
|
||||||
|
from utils.project_dir import get_path
|
||||||
|
import re
|
||||||
|
|
||||||
|
def read_elektro_into_graph(graph: Graph, filename: str):
|
||||||
|
pattern = re.compile(r'"([^"]+)";"([^"]+)";(\d+)')
|
||||||
|
with (open(filename, "r") as file):
|
||||||
|
for line in file:
|
||||||
|
m = pattern.match(line)
|
||||||
|
if m:
|
||||||
|
start_name = m.group(1)
|
||||||
|
end_name = m.group(2)
|
||||||
|
cost = int(m.group(3))
|
||||||
|
graph.insert_vertex(start_name)
|
||||||
|
graph.insert_vertex(end_name)
|
||||||
|
graph.connect(start_name, end_name, cost)
|
||||||
|
graph.connect(end_name, start_name, cost)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
graph = AdjacencyMatrixGraph()
|
||||||
|
read_elektro_into_graph(graph, get_path("data/elektro.txt"))
|
||||||
|
|
||||||
|
parents, cost = graph.mst_prim()
|
||||||
|
print(f"Kosten nach Prim: {cost}")
|
||||||
|
for node, parent in parents.items():
|
||||||
|
if parent is not None:
|
||||||
|
print(f"{node} - {parent}")
|
||||||
|
|
||||||
|
edges, cost = graph.mst_kruskal()
|
||||||
|
print(f"Kosten nach Kruskal: {cost}")
|
||||||
|
for start_name, end_name, _ in edges:
|
||||||
|
print(f"{start_name} - {end_name}")
|
@ -1,243 +0,0 @@
|
|||||||
import logging
|
|
||||||
from graphviz import Digraph
|
|
||||||
from collections import deque
|
|
||||||
import heapq
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
# logging.basicConfig(level=logging.DEBUG)
|
|
||||||
|
|
||||||
import time
|
|
||||||
|
|
||||||
def timeMS(func, *args, **kwargs):
|
|
||||||
startTime = time.perf_counter()
|
|
||||||
result = func(*args, **kwargs)
|
|
||||||
endTime = time.perf_counter()
|
|
||||||
elapsedMS = (endTime - startTime) * 1000 # Convert to milliseconds
|
|
||||||
print(f"{func.__name__} took {elapsedMS:.2f} ms")
|
|
||||||
return result
|
|
||||||
|
|
||||||
class Graph:
|
|
||||||
def __init__(self):
|
|
||||||
self.adjacencyList = {}
|
|
||||||
|
|
||||||
def addEdge(self, node1, node2, bidirectional=True):
|
|
||||||
if node1 not in self.adjacencyList:
|
|
||||||
self.adjacencyList[node1] = []
|
|
||||||
if node2 not in self.adjacencyList:
|
|
||||||
self.adjacencyList[node2] = []
|
|
||||||
self.adjacencyList[node1].append(node2)
|
|
||||||
if bidirectional:
|
|
||||||
self.adjacencyList[node2].append(node1)
|
|
||||||
|
|
||||||
def serialize(self):
|
|
||||||
return self.adjacencyList
|
|
||||||
|
|
||||||
def breadthFirstSearch(self, start, goal, edgesGonePassed = None):
|
|
||||||
if start not in self.adjacencyList or goal not in self.adjacencyList:
|
|
||||||
return None, None
|
|
||||||
|
|
||||||
# Dont want to have a class for this, set suffices
|
|
||||||
visited = set()
|
|
||||||
queue = deque([(start, [start], set())]) # (current_node, path, edgesGoneToGetHere)
|
|
||||||
|
|
||||||
while queue:
|
|
||||||
currentNode, path, edgesGone = queue.popleft()
|
|
||||||
|
|
||||||
if currentNode == goal:
|
|
||||||
return path, edgesGone
|
|
||||||
|
|
||||||
if currentNode not in visited:
|
|
||||||
logger.info(f"visiting {currentNode}")
|
|
||||||
visited.add(currentNode)
|
|
||||||
for neighbor in self.adjacencyList[currentNode]:
|
|
||||||
edge = (currentNode, neighbor)
|
|
||||||
# We already went this Edge. Read Part3 as not allowing this to happen
|
|
||||||
edgeReverse = (neighbor, currentNode)
|
|
||||||
if neighbor not in visited and (edgesGonePassed is None or edge not in edgesGonePassed) and (edgesGonePassed is None or edgeReverse not in edgesGonePassed):
|
|
||||||
# Pythonic way of saying neighbour, path no next clear neighbour
|
|
||||||
# and union of edgesGone with current edge
|
|
||||||
queue.append((neighbor, path + [neighbor], edgesGone | {edge}))
|
|
||||||
|
|
||||||
return None, None
|
|
||||||
|
|
||||||
class SpanningTreeGraph(Graph):
|
|
||||||
def __init__(self):
|
|
||||||
super().__init__()
|
|
||||||
# Store as {(node1,node2): weight} for fast and clean lookup
|
|
||||||
self.edges = {}
|
|
||||||
|
|
||||||
def addWeightedEdge(self, node1, node2, weight, bidirectional=True):
|
|
||||||
super().addEdge(node1, node2, bidirectional)
|
|
||||||
self.edges[(node1, node2)] = weight;
|
|
||||||
if bidirectional:
|
|
||||||
self.edges[(node2, node1)] = weight;
|
|
||||||
|
|
||||||
def prim(self, startNode):
|
|
||||||
if startNode not in self.adjacencyList:
|
|
||||||
return None
|
|
||||||
|
|
||||||
visited = set()
|
|
||||||
# PQ -> heapq uses pylist
|
|
||||||
# Also, why do we need to init distance and parents to inf and None
|
|
||||||
# Isnt this information stored in the Graph and/or the PQ already?
|
|
||||||
pqMinHeap = []
|
|
||||||
minSpanTree = []
|
|
||||||
|
|
||||||
visited.add(startNode)
|
|
||||||
# Interanly use the weights compare and just take the minWeight -> No need for "inf"
|
|
||||||
# Also store from->to for graph-building
|
|
||||||
for neighbour in self.adjacencyList[startNode]:
|
|
||||||
if (tmp := (startNode, neighbour)) in self.edges:
|
|
||||||
neighbourWeight = self.edges[tmp]
|
|
||||||
heapq.heappush(pqMinHeap, (neighbourWeight, startNode, neighbour));
|
|
||||||
|
|
||||||
while pqMinHeap:
|
|
||||||
weight, nodeFrom, nodeTo = heapq.heappop(pqMinHeap)
|
|
||||||
|
|
||||||
if nodeTo not in visited:
|
|
||||||
minSpanTree.append((nodeFrom, nodeTo, weight))
|
|
||||||
visited.add(nodeTo)
|
|
||||||
|
|
||||||
for neighbour in self.adjacencyList[nodeTo]:
|
|
||||||
if neighbour not in visited and (tmp := (nodeTo, neighbour)) in self.edges:
|
|
||||||
edgeWeight = self.edges[tmp]
|
|
||||||
heapq.heappush(pqMinHeap, (edgeWeight, nodeTo, neighbour))
|
|
||||||
|
|
||||||
return minSpanTree
|
|
||||||
|
|
||||||
# https://stackoverflow.com/questions/39713798/need-some-clarification-on-kruskals-and-union-find
|
|
||||||
# Use UnionByRank and Path-Compression instead of regular union-find for faster runtime and less performance impact
|
|
||||||
def kruskal(self):
|
|
||||||
sortedEdges = sorted(self.edges.items(), key=lambda item: item[1])
|
|
||||||
|
|
||||||
minSpanTree = []
|
|
||||||
|
|
||||||
# UnionByRank+PathCompression assumes each Node has itsself as a parent in the beginning and Rank 0, union then sets new parent as per usual
|
|
||||||
# Rank tries to pin together subtrees of the same rank to keep tree "clean"
|
|
||||||
# Then during find (loopdetection) bubble up tree during find (as usual), but pathcompression collapses the "parent" to the root for next loop
|
|
||||||
parent = {}
|
|
||||||
rank = {}
|
|
||||||
|
|
||||||
# Init only once -> Set for filter
|
|
||||||
for node in set(node for edge in self.edges.keys() for node in edge):
|
|
||||||
parent[node] = node
|
|
||||||
rank[node] = 0
|
|
||||||
|
|
||||||
def _find(node):
|
|
||||||
# Path compression
|
|
||||||
if parent[node] != node:
|
|
||||||
parent[node] = _find(parent[node])
|
|
||||||
return parent[node]
|
|
||||||
|
|
||||||
def _union(node1, node2):
|
|
||||||
# Union by rank
|
|
||||||
root1 = _find(node1)
|
|
||||||
root2 = _find(node2)
|
|
||||||
|
|
||||||
if root1 != root2:
|
|
||||||
if rank[root1] > rank[root2]:
|
|
||||||
parent[root2] = root1
|
|
||||||
elif rank[root1] < rank[root2]:
|
|
||||||
parent[root1] = root2
|
|
||||||
else:
|
|
||||||
parent[root2] = root1
|
|
||||||
rank[root1] += 1
|
|
||||||
|
|
||||||
for (node1, node2), weight in sortedEdges:
|
|
||||||
# no Loop
|
|
||||||
if _find(node1) != _find(node2):
|
|
||||||
minSpanTree.append((node1, node2, weight))
|
|
||||||
_union(node1, node2)
|
|
||||||
|
|
||||||
return minSpanTree;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def graphvizify(filePath: str, outputFile: str = 'build/hoehleGraph', edges=None):
|
|
||||||
import re
|
|
||||||
graph = Digraph()
|
|
||||||
graph.attr(rankdir='TB')
|
|
||||||
graph.attr('node', shape='circle', style='filled', fillcolor='lightgray')
|
|
||||||
graph.attr('edge', arrowsize='0.5')
|
|
||||||
|
|
||||||
# Reuse the function to also create our Graph... Waste not want not^^
|
|
||||||
caveGraph = SpanningTreeGraph();
|
|
||||||
|
|
||||||
# Provided Edges -> No Fileparsing, but display the MST
|
|
||||||
if edges:
|
|
||||||
# No dupl
|
|
||||||
addedEdges = set()
|
|
||||||
for (node1, node2), weight in edges.items():
|
|
||||||
edge = tuple(sorted((node1, node2))) # Sort nodes for uniqueness (A,B == B,A)
|
|
||||||
logger.debug(f"added {edge}")
|
|
||||||
if edge not in addedEdges:
|
|
||||||
graph.edge(f"\"{node1}\"", f"\"{node2}\"", label=(" " + str(weight)), dir='none')
|
|
||||||
addedEdges.add(edge)
|
|
||||||
else:
|
|
||||||
with open(filePath, 'r') as f:
|
|
||||||
for line in f:
|
|
||||||
line = line.strip()
|
|
||||||
# Cap "<STRING>";"<STRING>";<NUMBER> No whitespaces for sanity
|
|
||||||
match = re.match(r'"([^"]+)"\s*;\s*"([^"]+)"\s*;\s*(\d+)', line)
|
|
||||||
if match:
|
|
||||||
node1, node2, weight = match.groups()
|
|
||||||
weight = int(weight)
|
|
||||||
graph.edge(f"\"{node1}\"", f"\"{node2}\"", label=(" "+(str(weight))), dir='none')
|
|
||||||
logger.debug(f"Added {node1} -> {node2}")
|
|
||||||
caveGraph.addWeightedEdge(node1, node2, weight)
|
|
||||||
|
|
||||||
try:
|
|
||||||
graph.render(outputFile, view=True)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Could not display graph: {e}\n Trying to save without viewing!")
|
|
||||||
try:
|
|
||||||
graph.render(outputFile, view=False)
|
|
||||||
print(f"Your built map should be available here: {outputFile}.pdf")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Could not save graph file: {e}")
|
|
||||||
|
|
||||||
return caveGraph
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
start = "Höhleneingang"
|
|
||||||
goal = "Schatzkammer"
|
|
||||||
|
|
||||||
for filename in [ "data/elektro.txt" ]:
|
|
||||||
caveGraph = graphvizify(filename, 'build/hoehleGraphElektro')
|
|
||||||
mst = caveGraph.prim(start);
|
|
||||||
mstEdges = {(node1, node2): weight for node1, node2, weight in mst} # type: ignore -> "Pywright, Stop complaining. I know it can happen that we return none, but thats ok
|
|
||||||
logger.debug(f"Prim: {mstEdges}")
|
|
||||||
# Reuse the graphvizify to visualize the new MST
|
|
||||||
graphvizify(filename, 'build/prim', mstEdges)
|
|
||||||
|
|
||||||
mstKruskal = caveGraph.kruskal();
|
|
||||||
mstEdgesKruskal = {(node1, node2): weight for node1, node2, weight in mstKruskal} # type: ignore -> "Pywright, Stop complaining. I know it can happen that we return none, but thats ok
|
|
||||||
logger.debug(f"Kruskal: {mstEdgesKruskal}")
|
|
||||||
graphvizify(filename, 'build/kruskal', mstEdgesKruskal)
|
|
||||||
|
|
||||||
timeMS(caveGraph.prim, start)
|
|
||||||
timeMS(caveGraph.kruskal)
|
|
||||||
|
|
||||||
print(f"MinPrim: {sum(mstEdges.values())} <+#+> MinKruskal: {sum(mstEdgesKruskal.values())}")
|
|
||||||
|
|
||||||
exit(0);
|
|
||||||
|
|
||||||
## Old Search for shortest Path, no MST
|
|
||||||
shortestPath, edgesGoneInitial = caveGraph.breadthFirstSearch(start, goal)
|
|
||||||
print(shortestPath, edgesGoneInitial)
|
|
||||||
logger.debug(caveGraph.adjacencyList)
|
|
||||||
logger.debug(edgesGoneInitial)
|
|
||||||
|
|
||||||
if shortestPath:
|
|
||||||
print(f"Shortest path from {start} to {goal} is:")
|
|
||||||
print(" -> ".join(shortestPath))
|
|
||||||
else:
|
|
||||||
print(f"No path found from {start} to {goal}.")
|
|
||||||
|
|
||||||
returnpath, _ = caveGraph.breadthFirstSearch(goal, start, edgesGoneInitial)
|
|
||||||
|
|
||||||
if returnpath:
|
|
||||||
print(f"Shortest path from {goal} to {start} is:")
|
|
||||||
print(" -> ".join(returnpath))
|
|
||||||
else:
|
|
||||||
print("No path back Home found. Good Luck")
|
|
@ -2,9 +2,12 @@ from collections import deque
|
|||||||
from typing import List
|
from typing import List
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
import graphviz
|
import graphviz
|
||||||
|
import math
|
||||||
|
import heapq
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from utils.project_dir import get_path
|
from utils.project_dir import get_path
|
||||||
from utils.priority_queue import PriorityQueue
|
from utils.priority_queue import PriorityQueue
|
||||||
|
from vorlesung.L09_mst.disjoint import DisjointValue
|
||||||
|
|
||||||
|
|
||||||
class NodeColor(Enum):
|
class NodeColor(Enum):
|
||||||
@ -205,6 +208,68 @@ class Graph:
|
|||||||
relax(vertex, dest, weight)
|
relax(vertex, dest, weight)
|
||||||
return distance_map, predecessor_map
|
return distance_map, predecessor_map
|
||||||
|
|
||||||
|
def mst_prim(self, start_name: str = None):
|
||||||
|
""" Compute the minimum spanning tree of the graph using Prim's algorithm. """
|
||||||
|
|
||||||
|
distance_map = {} # maps vertices to their current distance from the spanning tree
|
||||||
|
parent_map = {} # maps vertices to their predecessor in the spanning tree
|
||||||
|
|
||||||
|
Vertex.__lt__ = lambda self, other: distance_map[self] < distance_map[other]
|
||||||
|
|
||||||
|
queue = []
|
||||||
|
|
||||||
|
if start_name is None:
|
||||||
|
start_name = self.all_vertices()[0].value
|
||||||
|
|
||||||
|
# Initialize the maps
|
||||||
|
for vertex in self.all_vertices():
|
||||||
|
distance_map[vertex] = 0 if vertex.value == start_name else math.inf
|
||||||
|
parent_map[vertex] = None
|
||||||
|
queue.append(vertex)
|
||||||
|
|
||||||
|
heapq.heapify(queue) # Convert the list into a heap
|
||||||
|
|
||||||
|
# Process the queue
|
||||||
|
cost = 0 # The cost of the minimum spanning tree
|
||||||
|
while len(queue) > 0:
|
||||||
|
vertex = heapq.heappop(queue)
|
||||||
|
cost += distance_map[vertex] # Add the cost of the edge to the minimum spanning tree
|
||||||
|
for (dest, w) in self.get_adjacent_vertices_with_weight(vertex.value):
|
||||||
|
if dest in queue and distance_map[dest] > w:
|
||||||
|
# Update the distance and parent maps
|
||||||
|
queue.remove(dest)
|
||||||
|
distance_map[dest] = w
|
||||||
|
parent_map[dest] = vertex
|
||||||
|
queue.append(dest) # Add the vertex back to the queue
|
||||||
|
heapq.heapify(queue) # Re-heapify the queue
|
||||||
|
|
||||||
|
# Return the distance and predecessor maps
|
||||||
|
return parent_map, cost
|
||||||
|
|
||||||
|
def mst_kruskal(self, start_name: str = None):
|
||||||
|
""" Compute the minimum spanning tree of the graph using Kruskal's algorithm. """
|
||||||
|
|
||||||
|
cost = 0
|
||||||
|
result = []
|
||||||
|
edges = self.all_edges()
|
||||||
|
|
||||||
|
# Create a disjoint set for each vertex
|
||||||
|
vertex_map = {v.value: DisjointValue(v) for v in self.all_vertices()}
|
||||||
|
|
||||||
|
# Sort the edges by weight
|
||||||
|
edges.sort(key=lambda edge: edge[2])
|
||||||
|
|
||||||
|
# Process the edges
|
||||||
|
for edge in edges:
|
||||||
|
start_name, end_name, weight = edge
|
||||||
|
# Check if the edge creates a cycle
|
||||||
|
if not vertex_map[start_name].same_set(vertex_map[end_name]):
|
||||||
|
result.append(edge)
|
||||||
|
vertex_map[start_name].union(vertex_map[end_name])
|
||||||
|
cost += weight
|
||||||
|
|
||||||
|
return result, cost
|
||||||
|
|
||||||
|
|
||||||
class AdjacencyListGraph(Graph):
|
class AdjacencyListGraph(Graph):
|
||||||
"""A graph implemented as an adjacency list."""
|
"""A graph implemented as an adjacency list."""
|
||||||
@ -243,8 +308,6 @@ class AdjacencyListGraph(Graph):
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class AdjacencyMatrixGraph(Graph):
|
class AdjacencyMatrixGraph(Graph):
|
||||||
"""A graph implemented as an adjacency matrix."""
|
"""A graph implemented as an adjacency matrix."""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
0
vorlesung/L09_mst/__init__.py
Normal file
0
vorlesung/L09_mst/__init__.py
Normal file
18
vorlesung/L09_mst/disjoint.py
Normal file
18
vorlesung/L09_mst/disjoint.py
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
|
||||||
|
|
||||||
|
class DisjointValue():
|
||||||
|
|
||||||
|
def __init__(self, value):
|
||||||
|
self.value = value
|
||||||
|
self.parent = None
|
||||||
|
|
||||||
|
def canonical(self):
|
||||||
|
if self.parent:
|
||||||
|
return self.parent.canonical()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def same_set(self, other):
|
||||||
|
return self.canonical() == other.canonical()
|
||||||
|
|
||||||
|
def union(self, other):
|
||||||
|
self.canonical().parent = other.canonical()
|
Loading…
x
Reference in New Issue
Block a user