feat: trigger parsing for !start and !stop commands

This commit is contained in:
Oliver Hofmann 2026-05-16 17:22:55 +02:00
parent 7758eb21fa
commit 3332592bef
2 changed files with 187 additions and 0 deletions

126
src/teampulse/monitor.py Normal file
View File

@ -0,0 +1,126 @@
import re
from datetime import datetime
from playwright.sync_api import Page
from teampulse.models import AuditEntry, AuditWindow, ChatMessage
# Discovered via scripts/discover_dom.py — update if Teams web changes
_MSG_SELECTOR = "[data-tid='chat-pane-message']"
_SENDER_SELECTOR = "[data-tid='message-author-name']"
_PROFILE_EMAIL_SELECTOR = "[data-tid='persona-card-email']"
_START_RE = re.compile(r'^!start(?:\s+"([^"]*)")?$', re.IGNORECASE)
_STOP_RE = re.compile(r'^!stop$', re.IGNORECASE)
def parse_trigger(message: ChatMessage, current_user: str) -> tuple[str, str] | None:
if message.sender != current_user:
return None
text = message.text.strip()
m = _START_RE.match(text)
if m:
name = (m.group(1) or "").strip() or "Unbekannter Vortragender"
return ("start", name)
if _STOP_RE.match(text):
return ("stop", "")
return None
class Monitor:
def __init__(self, page: Page, current_user: str):
self._page = page
self._current_user = current_user
self._seen_message_ids: set[str] = set()
def wait_for_chat(self) -> None:
print("Warte auf Teams-Chat-Seite... (bitte zur Meeting-Chat-Seite navigieren)")
self._page.wait_for_selector(_MSG_SELECTOR, timeout=300_000)
print("Chat erkannt.")
def get_current_user_display_name(self) -> str:
el = self._page.query_selector("[data-tid='me-control-display-name']")
if el:
return el.inner_text().strip()
el = self._page.query_selector("[aria-label*='Profilbild']")
if el:
return (el.get_attribute("aria-label") or "").replace("Profilbild", "").strip()
raise RuntimeError("Konnte den Namen des eingeloggten Nutzers nicht ermitteln.")
def poll_new_messages(self) -> list[ChatMessage]:
elements = self._page.query_selector_all(_MSG_SELECTOR)
new_messages = []
for el in elements:
msg_id = el.get_attribute("data-tid") or el.get_attribute("id") or el.inner_text()[:40]
if msg_id in self._seen_message_ids:
continue
self._seen_message_ids.add(msg_id)
sender_el = el.query_selector(_SENDER_SELECTOR)
if not sender_el:
continue
sender = sender_el.inner_text().strip()
text = el.inner_text().strip()
new_messages.append(ChatMessage(
sender=sender,
text=text,
timestamp=datetime.now(),
))
return new_messages
def run(self) -> AuditWindow:
import time
self.wait_for_chat()
presenter: str | None = None
start_time: datetime | None = None
collected: list[ChatMessage] = []
print("Monitoring aktiv. Poste '!start \"Name\"' im Chat um ein Zeitfenster zu starten.")
while True:
try:
new_msgs = self.poll_new_messages()
except Exception:
print("Verbindung verloren, reconnecting...")
time.sleep(2)
continue
for msg in new_msgs:
trigger = parse_trigger(msg, self._current_user)
if trigger is None:
if start_time is not None:
collected.append(msg)
continue
action, value = trigger
if action == "start":
presenter = value
start_time = datetime.now()
collected = []
print(f"Zeitfenster gestartet — Vortragender: {presenter}")
elif action == "stop":
if start_time is None:
print("!stop ohne vorheriges !start — ignoriert.")
continue
end_time = datetime.now()
print("Zeitfenster beendet.")
seen_senders = list(dict.fromkeys(m.sender for m in collected))
entries = [AuditEntry(display_name=s, email="") for s in seen_senders]
return AuditWindow(
presenter=presenter,
moderator=self._current_user,
start_time=start_time,
end_time=end_time,
entries=entries,
)
time.sleep(2)

View File

@ -0,0 +1,61 @@
from datetime import datetime
from teampulse.models import ChatMessage
from teampulse.monitor import parse_trigger
def msg(sender: str, text: str) -> ChatMessage:
return ChatMessage(sender=sender, text=text, timestamp=datetime(2026, 5, 16, 10, 0, 0))
CURRENT_USER = "Oliver Hofmann"
def test_start_with_quoted_name():
result = parse_trigger(msg(CURRENT_USER, '!start "Anna Bauer"'), CURRENT_USER)
assert result == ("start", "Anna Bauer")
def test_start_with_single_word_name():
result = parse_trigger(msg(CURRENT_USER, '!start "Bauer"'), CURRENT_USER)
assert result == ("start", "Bauer")
def test_start_without_name_returns_placeholder():
result = parse_trigger(msg(CURRENT_USER, "!start"), CURRENT_USER)
assert result == ("start", "Unbekannter Vortragender")
def test_start_with_empty_quotes_returns_placeholder():
result = parse_trigger(msg(CURRENT_USER, '!start ""'), CURRENT_USER)
assert result == ("start", "Unbekannter Vortragender")
def test_stop_trigger():
result = parse_trigger(msg(CURRENT_USER, "!stop"), CURRENT_USER)
assert result == ("stop", "")
def test_ignored_if_not_current_user():
result = parse_trigger(msg("Klaus Huber", '!start "Anna Bauer"'), CURRENT_USER)
assert result is None
def test_stop_ignored_if_not_current_user():
result = parse_trigger(msg("Klaus Huber", "!stop"), CURRENT_USER)
assert result is None
def test_regular_message_returns_none():
result = parse_trigger(msg(CURRENT_USER, "Gute Frage!"), CURRENT_USER)
assert result is None
def test_case_insensitive_start():
result = parse_trigger(msg(CURRENT_USER, '!START "Anna Bauer"'), CURRENT_USER)
assert result == ("start", "Anna Bauer")
def test_case_insensitive_stop():
result = parse_trigger(msg(CURRENT_USER, "!STOP"), CURRENT_USER)
assert result == ("stop", "")