From 3332592bef1493b68a791efd4609e3f4a2826c9b Mon Sep 17 00:00:00 2001 From: Oliver Hofmann Date: Sat, 16 May 2026 17:22:55 +0200 Subject: [PATCH] feat: trigger parsing for !start and !stop commands --- src/teampulse/monitor.py | 126 +++++++++++++++++++++++++++++++++ tests/test_monitor_triggers.py | 61 ++++++++++++++++ 2 files changed, 187 insertions(+) create mode 100644 src/teampulse/monitor.py create mode 100644 tests/test_monitor_triggers.py diff --git a/src/teampulse/monitor.py b/src/teampulse/monitor.py new file mode 100644 index 0000000..92c7cf0 --- /dev/null +++ b/src/teampulse/monitor.py @@ -0,0 +1,126 @@ +import re +from datetime import datetime + +from playwright.sync_api import Page + +from teampulse.models import AuditEntry, AuditWindow, ChatMessage + +# Discovered via scripts/discover_dom.py — update if Teams web changes +_MSG_SELECTOR = "[data-tid='chat-pane-message']" +_SENDER_SELECTOR = "[data-tid='message-author-name']" +_PROFILE_EMAIL_SELECTOR = "[data-tid='persona-card-email']" + +_START_RE = re.compile(r'^!start(?:\s+"([^"]*)")?$', re.IGNORECASE) +_STOP_RE = re.compile(r'^!stop$', re.IGNORECASE) + + +def parse_trigger(message: ChatMessage, current_user: str) -> tuple[str, str] | None: + if message.sender != current_user: + return None + + text = message.text.strip() + + m = _START_RE.match(text) + if m: + name = (m.group(1) or "").strip() or "Unbekannter Vortragender" + return ("start", name) + + if _STOP_RE.match(text): + return ("stop", "") + + return None + + +class Monitor: + def __init__(self, page: Page, current_user: str): + self._page = page + self._current_user = current_user + self._seen_message_ids: set[str] = set() + + def wait_for_chat(self) -> None: + print("Warte auf Teams-Chat-Seite... (bitte zur Meeting-Chat-Seite navigieren)") + self._page.wait_for_selector(_MSG_SELECTOR, timeout=300_000) + print("Chat erkannt.") + + def get_current_user_display_name(self) -> str: + el = self._page.query_selector("[data-tid='me-control-display-name']") + if el: + return el.inner_text().strip() + el = self._page.query_selector("[aria-label*='Profilbild']") + if el: + return (el.get_attribute("aria-label") or "").replace("Profilbild", "").strip() + raise RuntimeError("Konnte den Namen des eingeloggten Nutzers nicht ermitteln.") + + def poll_new_messages(self) -> list[ChatMessage]: + elements = self._page.query_selector_all(_MSG_SELECTOR) + new_messages = [] + for el in elements: + msg_id = el.get_attribute("data-tid") or el.get_attribute("id") or el.inner_text()[:40] + if msg_id in self._seen_message_ids: + continue + self._seen_message_ids.add(msg_id) + + sender_el = el.query_selector(_SENDER_SELECTOR) + if not sender_el: + continue + sender = sender_el.inner_text().strip() + text = el.inner_text().strip() + new_messages.append(ChatMessage( + sender=sender, + text=text, + timestamp=datetime.now(), + )) + return new_messages + + def run(self) -> AuditWindow: + import time + + self.wait_for_chat() + + presenter: str | None = None + start_time: datetime | None = None + collected: list[ChatMessage] = [] + + print("Monitoring aktiv. Poste '!start \"Name\"' im Chat um ein Zeitfenster zu starten.") + + while True: + try: + new_msgs = self.poll_new_messages() + except Exception: + print("Verbindung verloren, reconnecting...") + time.sleep(2) + continue + + for msg in new_msgs: + trigger = parse_trigger(msg, self._current_user) + + if trigger is None: + if start_time is not None: + collected.append(msg) + continue + + action, value = trigger + + if action == "start": + presenter = value + start_time = datetime.now() + collected = [] + print(f"Zeitfenster gestartet — Vortragender: {presenter}") + + elif action == "stop": + if start_time is None: + print("!stop ohne vorheriges !start — ignoriert.") + continue + end_time = datetime.now() + print("Zeitfenster beendet.") + seen_senders = list(dict.fromkeys(m.sender for m in collected)) + entries = [AuditEntry(display_name=s, email="") for s in seen_senders] + return AuditWindow( + presenter=presenter, + moderator=self._current_user, + start_time=start_time, + end_time=end_time, + entries=entries, + ) + + time.sleep(2) diff --git a/tests/test_monitor_triggers.py b/tests/test_monitor_triggers.py new file mode 100644 index 0000000..47eb00b --- /dev/null +++ b/tests/test_monitor_triggers.py @@ -0,0 +1,61 @@ +from datetime import datetime + +from teampulse.models import ChatMessage +from teampulse.monitor import parse_trigger + + +def msg(sender: str, text: str) -> ChatMessage: + return ChatMessage(sender=sender, text=text, timestamp=datetime(2026, 5, 16, 10, 0, 0)) + + +CURRENT_USER = "Oliver Hofmann" + + +def test_start_with_quoted_name(): + result = parse_trigger(msg(CURRENT_USER, '!start "Anna Bauer"'), CURRENT_USER) + assert result == ("start", "Anna Bauer") + + +def test_start_with_single_word_name(): + result = parse_trigger(msg(CURRENT_USER, '!start "Bauer"'), CURRENT_USER) + assert result == ("start", "Bauer") + + +def test_start_without_name_returns_placeholder(): + result = parse_trigger(msg(CURRENT_USER, "!start"), CURRENT_USER) + assert result == ("start", "Unbekannter Vortragender") + + +def test_start_with_empty_quotes_returns_placeholder(): + result = parse_trigger(msg(CURRENT_USER, '!start ""'), CURRENT_USER) + assert result == ("start", "Unbekannter Vortragender") + + +def test_stop_trigger(): + result = parse_trigger(msg(CURRENT_USER, "!stop"), CURRENT_USER) + assert result == ("stop", "") + + +def test_ignored_if_not_current_user(): + result = parse_trigger(msg("Klaus Huber", '!start "Anna Bauer"'), CURRENT_USER) + assert result is None + + +def test_stop_ignored_if_not_current_user(): + result = parse_trigger(msg("Klaus Huber", "!stop"), CURRENT_USER) + assert result is None + + +def test_regular_message_returns_none(): + result = parse_trigger(msg(CURRENT_USER, "Gute Frage!"), CURRENT_USER) + assert result is None + + +def test_case_insensitive_start(): + result = parse_trigger(msg(CURRENT_USER, '!START "Anna Bauer"'), CURRENT_USER) + assert result == ("start", "Anna Bauer") + + +def test_case_insensitive_stop(): + result = parse_trigger(msg(CURRENT_USER, "!STOP"), CURRENT_USER) + assert result == ("stop", "")