2026-05-04 10:39:08 +02:00

128 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..'))
from utils.algo_context import AlgoContext
from utils.algo_int import Int
class PriorityQueue:
"""Max-Heap-basierte Vorrang-Warteschlange mit Operationszählung.
Höhere Prioritätswerte werden zuerst entnommen. Prioritäten werden als
Int-Objekte gespeichert, sodass Vergleiche automatisch im AlgoContext
gezählt werden. Swaps zählen je 2 reads + 2 writes.
"""
def __init__(self, ctx: AlgoContext = None):
self._ctx = ctx if ctx is not None else AlgoContext()
self._heap = [] # Liste von (Int priority, item)
def insert(self, item, priority):
"""Fügt item mit der gegebenen Priorität ein."""
self._heap.append((Int(priority, self._ctx), item))
self._ctx.writes += 1
self._sift_up(len(self._heap) - 1)
def pop(self):
"""Entfernt das Element mit der höchsten Priorität und gibt es zurück."""
if self.is_empty():
raise IndexError("Priority queue is empty")
self._swap(0, len(self._heap) - 1)
self._ctx.reads += 1
_, item = self._heap.pop()
if self._heap:
self._sift_down(0)
return item
def peek(self):
"""Gibt das Element mit der höchsten Priorität zurück (ohne Entfernung)."""
if self.is_empty():
raise IndexError("Priority queue is empty")
self._ctx.reads += 1
return self._heap[0][1]
def is_empty(self):
"""Gibt True zurück, wenn die Warteschlange leer ist."""
return len(self._heap) == 0
def __len__(self):
return len(self._heap)
def __repr__(self):
pairs = sorted(self._heap, key=lambda t: t[0].value, reverse=True)
return "PriorityQueue([" + ", ".join(f"{p.value}:{v}" for p, v in pairs) + "])"
# ── interne Heap-Operationen ───────────────────────────────────────────────
#
# Terminologie-Hinweis: In der Vorlesung heißt die Abwärts-
# Operation MAX-HEAPIFY sie stellt die Heap-Eigenschaft für einen
# Teilbaum wieder her. _sift_down ist funktional identisch damit, folgt
# aber der in der übrigen Literatur gebräuchlicheren Richtungsbezeichnung
# (sift = „sieben/sinken lassen").
#
# _sift_up hat kein direktes Gegenstück in der Vorlesung, weil Heapsort
# nie ein Element einfügt: Dort wird nur nach unten geheapifiziert.
# Für eine vollständige Priority Queue mit insert() wird jedoch auch die
# Aufwärts-Richtung benötigt.
def _swap(self, i, j):
self._ctx.reads += 2
self._ctx.writes += 2
self._heap[i], self._heap[j] = self._heap[j], self._heap[i]
def _sift_up(self, i):
while i > 0:
parent = (i - 1) // 2
if self._heap[i][0] > self._heap[parent][0]: # Int: 2 reads + 1 cmp
self._swap(i, parent)
i = parent
else:
break
def _sift_down(self, i):
n = len(self._heap)
while True:
largest, left, right = i, 2 * i + 1, 2 * i + 2
if left < n and self._heap[left][0] > self._heap[largest][0]:
largest = left
if right < n and self._heap[right][0] > self._heap[largest][0]:
largest = right
if largest == i:
break
self._swap(i, largest)
i = largest
# ── Demo & Auswertung ─────────────────────────────────────────────────────────
if __name__ == '__main__':
import matplotlib.pyplot as plt
import random
SIZES = list(range(10, 210, 10))
metrics = {m: [] for m in ("comparisons", "reads", "writes")}
for n in SIZES:
ctx = AlgoContext()
pq = PriorityQueue(ctx)
for p in random.sample(range(n * 10), n):
pq.insert(None, p)
while not pq.is_empty():
pq.pop()
for m in metrics:
metrics[m].append(getattr(ctx, m))
titles = {"comparisons": "Vergleiche", "reads": "Lesezugriffe", "writes": "Schreibzugriffe"}
fig, axes = plt.subplots(len(metrics), 1, figsize=(10, 9), sharex=True)
for ax, (metric, values) in zip(axes, metrics.items()):
ax.plot(SIZES, values, marker='o', markersize=3)
ax.set_ylabel(titles[metric])
axes[0].set_title("PriorityQueue n inserts + n pops")
axes[-1].set_xlabel("n")
plt.tight_layout()
plt.show()