From 658ec752749d014528c2d4625c8e3da9aa33f9cf Mon Sep 17 00:00:00 2001 From: Oliver Hofmann Date: Sat, 16 May 2026 17:12:50 +0200 Subject: [PATCH] Add TeamPulse implementation plan --- .../superpowers/plans/2026-05-16-teampulse.md | 1095 +++++++++++++++++ 1 file changed, 1095 insertions(+) create mode 100644 docs/superpowers/plans/2026-05-16-teampulse.md diff --git a/docs/superpowers/plans/2026-05-16-teampulse.md b/docs/superpowers/plans/2026-05-16-teampulse.md new file mode 100644 index 0000000..3b7f8e7 --- /dev/null +++ b/docs/superpowers/plans/2026-05-16-teampulse.md @@ -0,0 +1,1095 @@ +# TeamPulse Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Python script that monitors a Microsoft Teams meeting chat, records who posted in a defined time window, resolves display names to email addresses via profile cards, and generates a sorted Markdown memo. + +**Architecture:** Playwright controls a headless Teams Web browser in the background. A polling loop reads new chat messages every 2 seconds. When `!start "Name"` / `!stop` commands are detected (posted by the current user), a time window opens/closes. After the window, profile cards are clicked to resolve emails. A Markdown memo is generated and saved. + +**Tech Stack:** Python 3.12, Playwright (sync API), pytest, pytest-mock + +--- + +## File Map + +| File | Responsibility | +|---|---| +| `src/teampulse/models.py` | Shared dataclasses: `ChatMessage`, `AuditEntry`, `AuditWindow` | +| `src/teampulse/memo.py` | Pure function: `AuditWindow → str`, save to file | +| `src/teampulse/resolver.py` | `Resolver` class: cache lookup + browser profile card extraction | +| `src/teampulse/monitor.py` | `Monitor` class: DOM polling, trigger parsing, message collection | +| `src/teampulse/auth.py` | `create_context()`: persistent Playwright session, login detection, read current user | +| `main.py` | Entry point: wire all components, start polling loop | +| `tests/test_memo.py` | Unit tests for memo generation | +| `tests/test_resolver_cache.py` | Unit tests for cache layer (no browser) | +| `tests/test_monitor_triggers.py` | Unit tests for trigger parsing | + +--- + +## Task 1: Project Setup + +**Files:** +- Create: `requirements.txt` +- Create: `requirements-dev.txt` +- Create: `src/teampulse/__init__.py` +- Create: `tests/__init__.py` +- Create: `pytest.ini` + +- [ ] **Step 1: Create requirements.txt** + +``` +playwright>=1.44 +``` + +- [ ] **Step 2: Create requirements-dev.txt** + +``` +playwright>=1.44 +pytest>=8.2 +pytest-mock>=3.14 +``` + +- [ ] **Step 3: Install dependencies** + +```bash +.venv/bin/pip install -r requirements-dev.txt +.venv/bin/playwright install chromium +``` + +Expected: Packages installed, Chromium browser downloaded. + +- [ ] **Step 4: Create package skeleton** + +`src/teampulse/__init__.py` — empty file. + +`tests/__init__.py` — empty file. + +- [ ] **Step 5: Create pytest.ini** + +```ini +[pytest] +testpaths = tests +pythonpath = src +``` + +- [ ] **Step 6: Verify pytest runs** + +```bash +.venv/bin/pytest --collect-only +``` + +Expected output: `no tests ran` (no errors). + +- [ ] **Step 7: Commit** + +```bash +git add requirements.txt requirements-dev.txt src/ tests/ pytest.ini +git commit -m "feat: project scaffold with dependencies" +``` + +--- + +## Task 2: Data Models + +**Files:** +- Create: `src/teampulse/models.py` + +- [ ] **Step 1: Write models.py** + +```python +from dataclasses import dataclass, field +from datetime import datetime + + +@dataclass +class ChatMessage: + sender: str + text: str + timestamp: datetime + + +@dataclass +class AuditEntry: + display_name: str + email: str + + +@dataclass +class AuditWindow: + presenter: str + moderator: str + start_time: datetime + end_time: datetime + entries: list[AuditEntry] = field(default_factory=list) +``` + +- [ ] **Step 2: Verify import** + +```bash +.venv/bin/python -c "from teampulse.models import ChatMessage, AuditEntry, AuditWindow; print('OK')" +``` + +Expected: `OK` + +- [ ] **Step 3: Commit** + +```bash +git add src/teampulse/models.py +git commit -m "feat: add shared data models" +``` + +--- + +## Task 3: Memo Generation + +**Files:** +- Create: `src/teampulse/memo.py` +- Create: `tests/test_memo.py` + +- [ ] **Step 1: Write failing tests** + +`tests/test_memo.py`: + +```python +from datetime import datetime +from pathlib import Path + +import pytest + +from teampulse.memo import generate_memo, save_memo +from teampulse.models import AuditEntry, AuditWindow + + +def make_window(**kwargs): + defaults = dict( + presenter="Anna Bauer", + moderator="Oliver Hofmann", + start_time=datetime(2026, 5, 16, 10, 3, 42), + end_time=datetime(2026, 5, 16, 10, 47, 15), + entries=[ + AuditEntry("Thomas Wolf", "t.wolf@company.com"), + AuditEntry("Klaus Huber", "k.huber@company.com"), + AuditEntry("Sandra Vogel", "s.vogel@company.com"), + ], + ) + defaults.update(kwargs) + return AuditWindow(**defaults) + + +def test_memo_contains_presenter_name(): + result = generate_memo(make_window()) + assert "Anna Bauer" in result + + +def test_memo_contains_time_window(): + result = generate_memo(make_window()) + assert "10:03:42" in result + assert "10:47:15" in result + + +def test_memo_sorts_entries_by_email(): + result = generate_memo(make_window()) + idx_huber = result.index("k.huber@company.com") + idx_vogel = result.index("s.vogel@company.com") + idx_wolf = result.index("t.wolf@company.com") + assert idx_huber < idx_vogel < idx_wolf + + +def test_memo_excludes_presenter(): + window = make_window( + entries=[ + AuditEntry("Anna Bauer", "a.bauer@company.com"), + AuditEntry("Klaus Huber", "k.huber@company.com"), + ] + ) + result = generate_memo(window) + assert "a.bauer@company.com" not in result + assert "k.huber@company.com" in result + + +def test_memo_excludes_moderator(): + window = make_window( + entries=[ + AuditEntry("Oliver Hofmann", "o.hofmann@company.com"), + AuditEntry("Klaus Huber", "k.huber@company.com"), + ] + ) + result = generate_memo(window) + assert "o.hofmann@company.com" not in result + assert "k.huber@company.com" in result + + +def test_memo_empty_window(): + window = make_window(entries=[]) + result = generate_memo(window) + assert "Keine Chat-Aktivität" in result + + +def test_memo_contains_moderator_note(): + result = generate_memo(make_window()) + assert "Oliver Hofmann" in result + + +def test_save_memo_writes_file(tmp_path): + window = make_window() + path = save_memo(generate_memo(window), directory=tmp_path, timestamp=datetime(2026, 5, 16, 10, 51)) + assert path.exists() + assert path.name == "audit_20260516_1051.md" + assert "Anna Bauer" in path.read_text() +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +.venv/bin/pytest tests/test_memo.py -v +``` + +Expected: `ImportError` — `teampulse.memo` does not exist yet. + +- [ ] **Step 3: Write memo.py** + +`src/teampulse/memo.py`: + +```python +from datetime import datetime +from pathlib import Path + +from teampulse.models import AuditWindow + + +def generate_memo(window: AuditWindow) -> str: + filtered = [ + e for e in window.entries + if e.display_name != window.presenter and e.display_name != window.moderator + ] + sorted_entries = sorted(filtered, key=lambda e: e.email) + + start = window.start_time.strftime("%H:%M:%S") + end = window.end_time.strftime("%H:%M:%S") + + lines = [ + "# Meeting Chat Audit", + f"Zeitfenster: {start} – {end}", + "", + ] + + if not sorted_entries: + lines += [ + f"## Teilnehmer am Vortrag von {window.presenter} (0)", + f"_Moderator: {window.moderator} — Vortragender und Moderator ausgeschlossen_", + "", + "_Keine Chat-Aktivität in diesem Zeitfenster._", + ] + else: + lines += [ + f"## Teilnehmer am Vortrag von {window.presenter} ({len(sorted_entries)})", + f"_Moderator: {window.moderator} — Vortragender und Moderator ausgeschlossen_", + "", + ] + for entry in sorted_entries: + lines.append(f"- {entry.display_name} <{entry.email}>") + + created = datetime.now().strftime("%Y-%m-%d %H:%M") + lines += ["", f"_Sortiert nach E-Mail-Adresse. Erstellt: {created}_"] + return "\n".join(lines) + + +def save_memo(content: str, directory: Path = Path("."), timestamp: datetime | None = None) -> Path: + ts = timestamp or datetime.now() + filename = f"audit_{ts.strftime('%Y%m%d_%H%M')}.md" + path = directory / filename + path.write_text(content, encoding="utf-8") + return path +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +.venv/bin/pytest tests/test_memo.py -v +``` + +Expected: All 8 tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add src/teampulse/memo.py tests/test_memo.py +git commit -m "feat: memo generation with filtering and sorting" +``` + +--- + +## Task 4: Resolver — Cache Layer + +**Files:** +- Create: `src/teampulse/resolver.py` +- Create: `tests/test_resolver_cache.py` + +- [ ] **Step 1: Write failing tests** + +`tests/test_resolver_cache.py`: + +```python +import json +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from teampulse.resolver import Resolver + + +@pytest.fixture +def cache_path(tmp_path): + return tmp_path / "cache.json" + + +def test_resolve_returns_email_from_cache(cache_path): + cache_path.write_text(json.dumps({"Max Mustermann": "m.mustermann@company.com"})) + resolver = Resolver(cache_path=cache_path, page=MagicMock()) + assert resolver.resolve("Max Mustermann") == "m.mustermann@company.com" + + +def test_resolve_unknown_name_calls_browser(cache_path): + cache_path.write_text(json.dumps({})) + mock_page = MagicMock() + resolver = Resolver(cache_path=cache_path, page=mock_page) + resolver._extract_email_from_profile = MagicMock(return_value="k.huber@company.com") + + result = resolver.resolve("Klaus Huber") + + resolver._extract_email_from_profile.assert_called_once_with("Klaus Huber") + assert result == "k.huber@company.com" + + +def test_resolve_caches_newly_resolved_email(cache_path): + cache_path.write_text(json.dumps({})) + resolver = Resolver(cache_path=cache_path, page=MagicMock()) + resolver._extract_email_from_profile = MagicMock(return_value="k.huber@company.com") + + resolver.resolve("Klaus Huber") + + saved = json.loads(cache_path.read_text()) + assert saved["Klaus Huber"] == "k.huber@company.com" + + +def test_resolve_unresolvable_returns_placeholder(cache_path): + cache_path.write_text(json.dumps({})) + resolver = Resolver(cache_path=cache_path, page=MagicMock()) + resolver._extract_email_from_profile = MagicMock(return_value=None) + + result = resolver.resolve("Unknown Person") + assert result == "" + + +def test_resolve_does_not_cache_unresolvable(cache_path): + cache_path.write_text(json.dumps({})) + resolver = Resolver(cache_path=cache_path, page=MagicMock()) + resolver._extract_email_from_profile = MagicMock(return_value=None) + + resolver.resolve("Unknown Person") + + saved = json.loads(cache_path.read_text()) + assert "Unknown Person" not in saved + + +def test_creates_cache_file_if_missing(tmp_path): + cache_path = tmp_path / "nonexistent.json" + resolver = Resolver(cache_path=cache_path, page=MagicMock()) + resolver._extract_email_from_profile = MagicMock(return_value="a@b.com") + resolver.resolve("Test User") + assert cache_path.exists() +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +.venv/bin/pytest tests/test_resolver_cache.py -v +``` + +Expected: `ImportError` — `teampulse.resolver` does not exist. + +- [ ] **Step 3: Write resolver.py (cache layer only)** + +`src/teampulse/resolver.py`: + +```python +import json +from pathlib import Path + +from playwright.sync_api import Page + + +class Resolver: + def __init__(self, cache_path: Path, page: Page): + self._cache_path = cache_path + self._page = page + self._cache: dict[str, str] = self._load_cache() + + def resolve(self, display_name: str) -> str: + if display_name in self._cache: + return self._cache[display_name] + + email = self._extract_email_from_profile(display_name) + if email is None: + return "" + + self._cache[display_name] = email + self._save_cache() + return email + + def _load_cache(self) -> dict[str, str]: + if not self._cache_path.exists(): + return {} + return json.loads(self._cache_path.read_text(encoding="utf-8")) + + def _save_cache(self) -> None: + self._cache_path.write_text( + json.dumps(self._cache, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + + def _extract_email_from_profile(self, display_name: str) -> str | None: + # Implemented in Task 8 — browser interaction with Teams profile cards + raise NotImplementedError +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +.venv/bin/pytest tests/test_resolver_cache.py -v +``` + +Expected: All 6 tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add src/teampulse/resolver.py tests/test_resolver_cache.py +git commit -m "feat: resolver cache layer" +``` + +--- + +## Task 5: Monitor — Trigger Parsing + +**Files:** +- Modify: `src/teampulse/monitor.py` (create) +- Create: `tests/test_monitor_triggers.py` + +- [ ] **Step 1: Write failing tests** + +`tests/test_monitor_triggers.py`: + +```python +import re +from datetime import datetime + +import pytest + +from teampulse.models import ChatMessage +from teampulse.monitor import parse_trigger + + +def msg(sender: str, text: str) -> ChatMessage: + return ChatMessage(sender=sender, text=text, timestamp=datetime(2026, 5, 16, 10, 0, 0)) + + +CURRENT_USER = "Oliver Hofmann" + + +def test_start_with_quoted_name(): + result = parse_trigger(msg(CURRENT_USER, '!start "Anna Bauer"'), CURRENT_USER) + assert result == ("start", "Anna Bauer") + + +def test_start_with_single_word_name(): + result = parse_trigger(msg(CURRENT_USER, '!start "Bauer"'), CURRENT_USER) + assert result == ("start", "Bauer") + + +def test_start_without_name_returns_placeholder(): + result = parse_trigger(msg(CURRENT_USER, "!start"), CURRENT_USER) + assert result == ("start", "Unbekannter Vortragender") + + +def test_start_with_empty_quotes_returns_placeholder(): + result = parse_trigger(msg(CURRENT_USER, '!start ""'), CURRENT_USER) + assert result == ("start", "Unbekannter Vortragender") + + +def test_stop_trigger(): + result = parse_trigger(msg(CURRENT_USER, "!stop"), CURRENT_USER) + assert result == ("stop", "") + + +def test_ignored_if_not_current_user(): + result = parse_trigger(msg("Klaus Huber", '!start "Anna Bauer"'), CURRENT_USER) + assert result is None + + +def test_stop_ignored_if_not_current_user(): + result = parse_trigger(msg("Klaus Huber", "!stop"), CURRENT_USER) + assert result is None + + +def test_regular_message_returns_none(): + result = parse_trigger(msg(CURRENT_USER, "Gute Frage!"), CURRENT_USER) + assert result is None + + +def test_case_insensitive_start(): + result = parse_trigger(msg(CURRENT_USER, '!START "Anna Bauer"'), CURRENT_USER) + assert result == ("start", "Anna Bauer") + + +def test_case_insensitive_stop(): + result = parse_trigger(msg(CURRENT_USER, "!STOP"), CURRENT_USER) + assert result == ("stop", "") +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +.venv/bin/pytest tests/test_monitor_triggers.py -v +``` + +Expected: `ImportError` — `teampulse.monitor` does not exist. + +- [ ] **Step 3: Write monitor.py (trigger parsing only)** + +`src/teampulse/monitor.py`: + +```python +import re +from datetime import datetime + +from playwright.sync_api import Page + +from teampulse.models import AuditWindow, ChatMessage + + +_START_RE = re.compile(r'^!start(?:\s+"([^"]*)")?$', re.IGNORECASE) +_STOP_RE = re.compile(r'^!stop$', re.IGNORECASE) + + +def parse_trigger(message: ChatMessage, current_user: str) -> tuple[str, str] | None: + if message.sender != current_user: + return None + + text = message.text.strip() + + m = _START_RE.match(text) + if m: + name = (m.group(1) or "").strip() or "Unbekannter Vortragender" + return ("start", name) + + if _STOP_RE.match(text): + return ("stop", "") + + return None + + +class Monitor: + """Chat polling loop — browser integration added in Task 7.""" + + def __init__(self, page: Page, current_user: str): + self._page = page + self._current_user = current_user +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +.venv/bin/pytest tests/test_monitor_triggers.py -v +``` + +Expected: All 10 tests PASS. + +- [ ] **Step 5: Run full test suite** + +```bash +.venv/bin/pytest -v +``` + +Expected: All tests PASS. + +- [ ] **Step 6: Commit** + +```bash +git add src/teampulse/monitor.py tests/test_monitor_triggers.py +git commit -m "feat: trigger parsing for !start and !stop commands" +``` + +--- + +## Task 6: DOM Discovery Script + +**Files:** +- Create: `scripts/discover_dom.py` + +This task identifies the exact CSS selectors for Teams Web. Run it once with Teams open, inspect output, then update the constants used in Tasks 7 and 8. + +- [ ] **Step 1: Create discovery script** + +`scripts/discover_dom.py`: + +```python +"""Run this script once with a Teams meeting chat open. +It prints DOM info to help identify selectors for messages and profile cards. +Usage: .venv/bin/python scripts/discover_dom.py +""" +import sys +from playwright.sync_api import sync_playwright + +URL = sys.argv[1] if len(sys.argv) > 1 else "https://teams.microsoft.com" +SESSION_DIR = str(__import__("pathlib").Path.home() / ".teampulse" / "session") + + +def main(): + with sync_playwright() as p: + browser = p.chromium.launch_persistent_context( + SESSION_DIR, + headless=False, + args=["--no-sandbox"], + ) + page = browser.pages[0] if browser.pages else browser.new_page() + page.goto(URL) + print("Navigate to a meeting chat in the browser, then press Enter here...") + input() + + # Print all elements that look like chat messages + messages = page.query_selector_all("[data-tid*='message'], [class*='message'], [role='listitem']") + print(f"\nFound {len(messages)} potential message elements") + for i, el in enumerate(messages[:5]): + print(f"\n--- Element {i} ---") + print(f" tag: {el.evaluate('e => e.tagName')}") + print(f" data-tid: {el.get_attribute('data-tid')}") + print(f" class (first 100): {(el.get_attribute('class') or '')[:100]}") + print(f" text (first 100): {el.inner_text()[:100]}") + + print("\nPress Enter to close...") + input() + browser.close() + + +if __name__ == "__main__": + main() +``` + +- [ ] **Step 2: Run the discovery script** + +```bash +mkdir -p scripts +.venv/bin/python scripts/discover_dom.py +``` + +Navigate to a Teams meeting chat, press Enter. Note the `data-tid` values and class names that identify: +1. Individual message containers +2. Sender name elements within messages +3. Profile card container (after clicking a sender name) +4. Email field within the profile card + +- [ ] **Step 3: Record findings** + +Open `src/teampulse/monitor.py` and add the discovered selectors as constants at the top: + +```python +# Discovered via scripts/discover_dom.py — update if Teams web changes +_MSG_SELECTOR = "[data-tid='chat-pane-message']" # one chat message +_SENDER_SELECTOR = "[data-tid='message-author-name']" # sender name within message +_PROFILE_EMAIL_SELECTOR = "[data-tid='persona-card-email']" # email in profile card +``` + +> **Note:** The selectors above are starting points based on known Teams web patterns. Adjust them based on what `discover_dom.py` shows. Common alternatives: +> - `[class*='authorName']` for sender names +> - `[class*='fui-Persona']` for profile cards + +- [ ] **Step 4: Commit** + +```bash +git add scripts/discover_dom.py src/teampulse/monitor.py +git commit -m "feat: DOM discovery script and selector constants" +``` + +--- + +## Task 7: Monitor — Chat Polling Loop + +**Files:** +- Modify: `src/teampulse/monitor.py` + +- [ ] **Step 1: Add polling loop to Monitor class** + +Replace the `Monitor` class body in `src/teampulse/monitor.py` with: + +```python +class Monitor: + def __init__(self, page: Page, current_user: str): + self._page = page + self._current_user = current_user + self._seen_message_ids: set[str] = set() + + def wait_for_chat(self) -> None: + """Block until a Teams chat page is detected.""" + print("Warte auf Teams-Chat-Seite... (bitte in der Browser-URL zur Meeting-Chat-Seite navigieren)") + self._page.wait_for_selector(_MSG_SELECTOR, timeout=300_000) + print("Chat erkannt.") + + def get_current_user_display_name(self) -> str: + """Read the logged-in user's display name from the Teams web UI.""" + el = self._page.query_selector("[data-tid='me-control-display-name']") + if el: + return el.inner_text().strip() + # Fallback: read from page title or profile area + el = self._page.query_selector("[aria-label*='Profilbild']") + if el: + return (el.get_attribute("aria-label") or "").replace("Profilbild", "").strip() + raise RuntimeError("Konnte den Namen des eingeloggten Nutzers nicht ermitteln.") + + def poll_new_messages(self) -> list[ChatMessage]: + """Return messages not yet seen.""" + elements = self._page.query_selector_all(_MSG_SELECTOR) + new_messages = [] + for el in elements: + msg_id = el.get_attribute("data-tid") or el.get_attribute("id") or el.inner_text()[:40] + if msg_id in self._seen_message_ids: + continue + self._seen_message_ids.add(msg_id) + + sender_el = el.query_selector(_SENDER_SELECTOR) + if not sender_el: + continue + sender = sender_el.inner_text().strip() + text = el.inner_text().strip() + new_messages.append(ChatMessage( + sender=sender, + text=text, + timestamp=datetime.now(), + )) + return new_messages + + def run(self) -> AuditWindow: + """Main loop: collect messages until !start/!stop window is complete.""" + import time + + self.wait_for_chat() + + presenter: str | None = None + start_time: datetime | None = None + collected: list[ChatMessage] = [] + + print(f"Monitoring aktiv. Poste '!start \"Name\"' im Chat um ein Zeitfenster zu starten.") + + while True: + try: + new_msgs = self.poll_new_messages() + except Exception: + print("Verbindung verloren, reconnecting...") + time.sleep(2) + continue + + for msg in new_msgs: + trigger = parse_trigger(msg, self._current_user) + + if trigger is None: + if start_time is not None: + collected.append(msg) + continue + + action, value = trigger + + if action == "start": + presenter = value + start_time = datetime.now() + collected = [] + print(f"Zeitfenster gestartet — Vortragender: {presenter}") + + elif action == "stop": + if start_time is None: + print("!stop ohne vorheriges !start — ignoriert.") + continue + end_time = datetime.now() + print(f"Zeitfenster beendet.") + from teampulse.models import AuditEntry + seen_senders = list(dict.fromkeys(m.sender for m in collected)) + entries = [AuditEntry(display_name=s, email="") for s in seen_senders] + return AuditWindow( + presenter=presenter, + moderator=self._current_user, + start_time=start_time, + end_time=end_time, + entries=entries, + ) + + time.sleep(2) +``` + +- [ ] **Step 2: Verify the module imports cleanly** + +```bash +.venv/bin/python -c "from teampulse.monitor import Monitor, parse_trigger; print('OK')" +``` + +Expected: `OK` + +- [ ] **Step 3: Run full test suite** + +```bash +.venv/bin/pytest -v +``` + +Expected: All tests PASS (polling loop has no unit tests — integration tested in Task 9). + +- [ ] **Step 4: Commit** + +```bash +git add src/teampulse/monitor.py +git commit -m "feat: chat polling loop and window collection" +``` + +--- + +## Task 8: Resolver — Browser Email Extraction + +**Files:** +- Modify: `src/teampulse/resolver.py` + +- [ ] **Step 1: Implement `_extract_email_from_profile`** + +Replace the `_extract_email_from_profile` stub in `src/teampulse/resolver.py` with: + +```python +def _extract_email_from_profile(self, display_name: str) -> str | None: + import time + + # Find a message from this sender and click their name + sender_elements = self._page.query_selector_all( + f"{_SENDER_SELECTOR}" + ) + target = None + for el in sender_elements: + if el.inner_text().strip() == display_name: + target = el + break + + if target is None: + print(f" Sender '{display_name}' nicht im Chat gefunden.") + return None + + try: + target.click() + # Wait for profile card to appear + self._page.wait_for_selector(_PROFILE_EMAIL_SELECTOR, timeout=5000) + email_el = self._page.query_selector(_PROFILE_EMAIL_SELECTOR) + email = email_el.inner_text().strip() if email_el else None + + # Close profile card by pressing Escape + self._page.keyboard.press("Escape") + time.sleep(0.5) + return email + except Exception as e: + print(f" Profilkarte für '{display_name}' nicht ladbar: {e}") + self._page.keyboard.press("Escape") + return None +``` + +Also add the selector constant import at the top of `resolver.py`: + +```python +from teampulse.monitor import _PROFILE_EMAIL_SELECTOR, _SENDER_SELECTOR +``` + +- [ ] **Step 2: Run the cache tests to ensure nothing broke** + +```bash +.venv/bin/pytest tests/test_resolver_cache.py -v +``` + +Expected: All 6 tests PASS. + +- [ ] **Step 3: Commit** + +```bash +git add src/teampulse/resolver.py +git commit -m "feat: browser-based email extraction from Teams profile cards" +``` + +--- + +## Task 9: Auth — Session Management + +**Files:** +- Create: `src/teampulse/auth.py` + +- [ ] **Step 1: Write auth.py** + +`src/teampulse/auth.py`: + +```python +from pathlib import Path + +from playwright.sync_api import BrowserContext, Playwright + +SESSION_DIR = Path.home() / ".teampulse" / "session" + + +def create_context(playwright: Playwright, headless: bool = True) -> BrowserContext: + SESSION_DIR.mkdir(parents=True, exist_ok=True) + context = playwright.chromium.launch_persistent_context( + str(SESSION_DIR), + headless=headless, + args=["--no-sandbox"], + locale="de-DE", + ) + return context + + +def ensure_logged_in(context: BrowserContext) -> None: + """ + Open Teams web. If redirected to a login page, make the browser + visible so the user can authenticate. Waits until login is complete. + """ + page = context.pages[0] if context.pages else context.new_page() + page.goto("https://teams.microsoft.com") + + if _is_login_page(page): + print("Bitte im Browser-Fenster anmelden (SSO/MFA)...") + # Make browser visible for login + page.evaluate("document.title = 'TeamPulse Login'") + page.wait_for_url("**/teams.microsoft.com/**", timeout=120_000) + print("Anmeldung erfolgreich.") + + page.close() + + +def _is_login_page(page) -> bool: + try: + page.wait_for_url("**/login**", timeout=3000) + return True + except Exception: + return False +``` + +- [ ] **Step 2: Verify import** + +```bash +.venv/bin/python -c "from teampulse.auth import create_context, ensure_logged_in; print('OK')" +``` + +Expected: `OK` + +- [ ] **Step 3: Commit** + +```bash +git add src/teampulse/auth.py +git commit -m "feat: Playwright session management with login detection" +``` + +--- + +## Task 10: Main Entry Point + +**Files:** +- Create: `main.py` + +- [ ] **Step 1: Write main.py** + +```python +#!/usr/bin/env python3 +"""TeamPulse — Teams meeting chat audit tool. + +Usage: + .venv/bin/python main.py + +The script opens a background browser, navigates to Teams. +Post !start "Presenter Name" and !stop in the meeting chat to define a time window. +A Markdown memo is saved to the current directory when the window closes. +""" +from pathlib import Path + +from playwright.sync_api import sync_playwright + +from teampulse.auth import create_context, ensure_logged_in +from teampulse.memo import generate_memo, save_memo +from teampulse.models import AuditEntry +from teampulse.monitor import Monitor +from teampulse.resolver import Resolver + +CACHE_PATH = Path.home() / ".teampulse" / "cache.json" + + +def main(): + with sync_playwright() as playwright: + print("Starte Browser...") + context = create_context(playwright, headless=False) + ensure_logged_in(context) + + page = context.new_page() + page.goto("https://teams.microsoft.com") + + monitor = Monitor(page=page, current_user="") + print("Lese eingeloggten Nutzer...") + current_user = monitor.get_current_user_display_name() + monitor._current_user = current_user + print(f"Eingeloggt als: {current_user}") + + print("\nNavigiere im Browser zur Meeting-Chat-Seite.") + print("Poste dann '!start \"Name des Vortragenden\"' im Chat.\n") + + window = monitor.run() + + resolver = Resolver(cache_path=CACHE_PATH, page=page) + print(f"\nLöse {len(window.entries)} E-Mail-Adressen auf...") + resolved_entries = [] + for entry in window.entries: + email = resolver.resolve(entry.display_name) + print(f" {entry.display_name} → {email}") + resolved_entries.append(AuditEntry(display_name=entry.display_name, email=email)) + + window.entries = resolved_entries + + memo_content = generate_memo(window) + path = save_memo(memo_content) + + print(f"\nMemo gespeichert: {path}") + print("\n" + memo_content) + + context.close() + + +if __name__ == "__main__": + main() +``` + +- [ ] **Step 2: Run full test suite one last time** + +```bash +.venv/bin/pytest -v +``` + +Expected: All tests PASS. + +- [ ] **Step 3: Smoke test (optional, requires active Teams session)** + +```bash +.venv/bin/python main.py +``` + +Expected: Browser opens, navigates to Teams. After login, monitoring starts. + +- [ ] **Step 4: Commit** + +```bash +git add main.py +git commit -m "feat: main entry point wiring all components" +``` + +--- + +## Self-Review Notes + +- **Spec coverage:** All spec requirements are covered: Playwright + Teams web (✓), persistent session (✓), `!start "Name"` / `!stop` triggers (✓), console Enter fallback — **gap:** console Enter fallback is mentioned in spec but not implemented. It can be added later as a threading enhancement to `monitor.run()` without changing the architecture. +- **Selector constants:** Defined in `monitor.py` and imported by `resolver.py` — consistent across both files. +- **`AuditEntry.email`:** Initialized as `""` in `monitor.py` Task 7 and filled in `main.py` after resolution — intentional two-phase design. +- **`_PROFILE_EMAIL_SELECTOR` and `_SENDER_SELECTOR`:** Exported from `monitor.py` and imported in `resolver.py` — single source of truth for DOM selectors.