forked from hofmannol/AlgoDatSoSe25
194 lines
5.9 KiB
Python
194 lines
5.9 KiB
Python
from utils.memory_array import MemoryArray
|
|
from utils.memory_cell import MemoryCell
|
|
from utils.literal import Literal
|
|
from utils.constants import MIN_VALUE
|
|
from utils.memory_range import mrange
|
|
|
|
# Impl of MemoryArray says we cant add our own Datatypes beside Literal and List
|
|
# BUUUUT we can just wrap our Datatype in a List :-)
|
|
# We store them in a MemoryArray internaly tho anyhow so we increment our Counters for the RAM
|
|
class HeapEntry:
|
|
def __init__(self, item, priority=1):
|
|
self.data = MemoryArray(Literal(2))
|
|
# 0: Content, 1: Prio
|
|
self.data[Literal(0)] = Literal(item)
|
|
self.data[Literal(1)] = Literal(priority)
|
|
|
|
def getItem(self):
|
|
return self.data[Literal(0)]
|
|
|
|
def getPriority(self):
|
|
return self.data[Literal(1)]
|
|
|
|
def setPriority(self, priority):
|
|
self.data[Literal(1)] = Literal(priority)
|
|
|
|
def __lt__(self, other):
|
|
if other is None:
|
|
return True
|
|
if isinstance(other, (int, float)):
|
|
return self.getPriority().value > other
|
|
return self.getPriority() < other.getPriority()
|
|
|
|
def __gt__(self, other):
|
|
if other is None:
|
|
return False
|
|
if isinstance(other, (int, float)):
|
|
return self.getPriority().value < other
|
|
return self.getPriority() > other.getPriority()
|
|
|
|
def __eq__(self, other):
|
|
return self.getPriority() == other.getPriority()
|
|
|
|
def __str__(self):
|
|
return f"({self.getItem()}, prio={self.getPriority()})"
|
|
|
|
class PriorityQueue:
|
|
def __init__(self, max_size : Literal = Literal(100)):
|
|
self.heap = MemoryArray(max_size)
|
|
# Add uninitialized HeapEntry Values so the Adds/Compares do not fail on emtpy stack.
|
|
# Would have to switch to MIN_VALUE if we switch what is a "Higher" Prio
|
|
for i in mrange(max_size.value):
|
|
self.heap[i].set([HeapEntry(MIN_VALUE, MIN_VALUE)])
|
|
self.size = MemoryCell(0)
|
|
|
|
def parent(self, i: Literal) -> Literal:
|
|
return MemoryCell(i.pred()) // Literal(2)
|
|
|
|
def leftChild(self, i: Literal) -> Literal:
|
|
return MemoryCell(MemoryCell(2) * i) + Literal(1)
|
|
|
|
def rightChild(self, i: Literal) -> Literal:
|
|
return MemoryCell(MemoryCell(2) * i) + Literal(2)
|
|
|
|
# Swap the Lists -> Therefore get the value which is the List and then Set it again
|
|
def swap(self, i: Literal, j: Literal):
|
|
tmp_i = self.heap[i].value
|
|
tmp_j = self.heap[j].value
|
|
self.heap[i].set(tmp_j)
|
|
self.heap[j].set(tmp_i)
|
|
|
|
def maxHeapify(self, i: Literal):
|
|
left = self.leftChild(i)
|
|
right = self.rightChild(i)
|
|
largest = i
|
|
|
|
if left < Literal(self.size.value) and self.heap[left].value[0] > self.heap[largest].value[0]:
|
|
largest = left
|
|
|
|
if right < Literal(self.size.value) and self.heap[right].value[0] > self.heap[largest].value[0]:
|
|
largest = right
|
|
|
|
if largest != i:
|
|
self.swap(i, largest)
|
|
self.maxHeapify(largest)
|
|
|
|
def insert(self, entry : HeapEntry):
|
|
if self.size >= self.heap.length():
|
|
raise IndexError("Heap full")
|
|
|
|
i = self.size
|
|
self.heap[i].set([entry])
|
|
|
|
while i > Literal(0) and self.heap[self.parent(i)].value[0] < self.heap[i].value[0]:
|
|
self.swap(i, self.parent(i))
|
|
i = self.parent(i)
|
|
|
|
self.size += Literal(1)
|
|
|
|
def pop(self):
|
|
if self.isEmpty():
|
|
raise IndexError("Queue is empty!")
|
|
|
|
max_item = self.heap[Literal(0)].value[0]
|
|
|
|
self.heap[Literal(0)] = self.heap[self.size - Literal(1)]
|
|
self.size -= Literal(1)
|
|
|
|
self.maxHeapify(Literal(0))
|
|
|
|
return max_item
|
|
|
|
def peek(self):
|
|
if self.isEmpty():
|
|
raise IndexError("Queue is empty")
|
|
return self.heap[Literal(0)].value[0]
|
|
|
|
def isEmpty(self):
|
|
return self.size == Literal(0)
|
|
|
|
def __len__(self):
|
|
return self.size
|
|
|
|
def __str__(self):
|
|
entries = []
|
|
for i in mrange(self.size.value):
|
|
entry = self.heap[i].value[0]
|
|
if entry.getItem() != MIN_VALUE:
|
|
entries.append(str(entry))
|
|
return "[" + ", ".join(entries) + "]"
|
|
|
|
def testQueueRandom(number: int):
|
|
import random
|
|
import string
|
|
|
|
pq = PriorityQueue(Literal(number))
|
|
|
|
entries = []
|
|
for _ in range(number):
|
|
value = ''.join(random.choices(string.ascii_uppercase + string.digits, k=3))
|
|
priority = random.randint(1, 100)
|
|
entry = HeapEntry(value, priority)
|
|
entries.append(entry)
|
|
pq.insert(entry)
|
|
|
|
print(pq)
|
|
for entry in entries:
|
|
print(f"Unprioritized: {entry}")
|
|
|
|
while not pq.isEmpty():
|
|
print(pq.pop())
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Proof of Concept
|
|
testEntry = HeapEntry("A", 2)
|
|
print(testEntry)
|
|
testArray = MemoryArray([testEntry])
|
|
print(testArray)
|
|
print(testArray[Literal(0)])
|
|
|
|
# Queue Testing
|
|
pq = PriorityQueue()
|
|
try:
|
|
pq.pop()
|
|
assert False, "Queue should be empty"
|
|
except IndexError:
|
|
pass
|
|
assert(pq.isEmpty() and pq.size == Literal(0))
|
|
entry = HeapEntry("A", 1)
|
|
pq.insert(entry)
|
|
assert(not pq.isEmpty() and pq.size == Literal(1))
|
|
pq.peek()
|
|
assert(not pq.isEmpty())
|
|
assert(pq.pop() == HeapEntry("A", 1))
|
|
assert(pq.isEmpty())
|
|
pq.insert(HeapEntry("C", 3))
|
|
pq.insert(HeapEntry("B", 2))
|
|
pq.insert(HeapEntry("A", 1))
|
|
assert(pq.size == Literal(3))
|
|
assert(pq.pop() == HeapEntry("C", 3))
|
|
assert(pq.pop() == HeapEntry("B", 2))
|
|
assert(pq.pop() == HeapEntry("A", 1))
|
|
pq.insert(HeapEntry("A", 1))
|
|
pq.insert(HeapEntry("C", 3))
|
|
pq.insert(HeapEntry("B", 2))
|
|
pq.insert(HeapEntry(42, 4))
|
|
pq.insert(HeapEntry(42, 1))
|
|
pq.insert(HeapEntry("C", 2))
|
|
print(pq)
|
|
while not pq.isEmpty():
|
|
print(pq.pop())
|
|
|
|
testQueueRandom(100)
|