teampulse/docs/superpowers/plans/2026-05-16-teampulse.md
2026-05-16 17:12:50 +02:00

30 KiB
Raw Permalink Blame History

TeamPulse Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Python script that monitors a Microsoft Teams meeting chat, records who posted in a defined time window, resolves display names to email addresses via profile cards, and generates a sorted Markdown memo.

Architecture: Playwright controls a headless Teams Web browser in the background. A polling loop reads new chat messages every 2 seconds. When !start "Name" / !stop commands are detected (posted by the current user), a time window opens/closes. After the window, profile cards are clicked to resolve emails. A Markdown memo is generated and saved.

Tech Stack: Python 3.12, Playwright (sync API), pytest, pytest-mock


File Map

File Responsibility
src/teampulse/models.py Shared dataclasses: ChatMessage, AuditEntry, AuditWindow
src/teampulse/memo.py Pure function: AuditWindow → str, save to file
src/teampulse/resolver.py Resolver class: cache lookup + browser profile card extraction
src/teampulse/monitor.py Monitor class: DOM polling, trigger parsing, message collection
src/teampulse/auth.py create_context(): persistent Playwright session, login detection, read current user
main.py Entry point: wire all components, start polling loop
tests/test_memo.py Unit tests for memo generation
tests/test_resolver_cache.py Unit tests for cache layer (no browser)
tests/test_monitor_triggers.py Unit tests for trigger parsing

Task 1: Project Setup

Files:

  • Create: requirements.txt

  • Create: requirements-dev.txt

  • Create: src/teampulse/__init__.py

  • Create: tests/__init__.py

  • Create: pytest.ini

  • Step 1: Create requirements.txt

playwright>=1.44
  • Step 2: Create requirements-dev.txt
playwright>=1.44
pytest>=8.2
pytest-mock>=3.14
  • Step 3: Install dependencies
.venv/bin/pip install -r requirements-dev.txt
.venv/bin/playwright install chromium

Expected: Packages installed, Chromium browser downloaded.

  • Step 4: Create package skeleton

src/teampulse/__init__.py — empty file.

tests/__init__.py — empty file.

  • Step 5: Create pytest.ini
[pytest]
testpaths = tests
pythonpath = src
  • Step 6: Verify pytest runs
.venv/bin/pytest --collect-only

Expected output: no tests ran (no errors).

  • Step 7: Commit
git add requirements.txt requirements-dev.txt src/ tests/ pytest.ini
git commit -m "feat: project scaffold with dependencies"

Task 2: Data Models

Files:

  • Create: src/teampulse/models.py

  • Step 1: Write models.py

from dataclasses import dataclass, field
from datetime import datetime


@dataclass
class ChatMessage:
    sender: str
    text: str
    timestamp: datetime


@dataclass
class AuditEntry:
    display_name: str
    email: str


@dataclass
class AuditWindow:
    presenter: str
    moderator: str
    start_time: datetime
    end_time: datetime
    entries: list[AuditEntry] = field(default_factory=list)
  • Step 2: Verify import
.venv/bin/python -c "from teampulse.models import ChatMessage, AuditEntry, AuditWindow; print('OK')"

Expected: OK

  • Step 3: Commit
git add src/teampulse/models.py
git commit -m "feat: add shared data models"

Task 3: Memo Generation

Files:

  • Create: src/teampulse/memo.py

  • Create: tests/test_memo.py

  • Step 1: Write failing tests

tests/test_memo.py:

from datetime import datetime
from pathlib import Path

import pytest

from teampulse.memo import generate_memo, save_memo
from teampulse.models import AuditEntry, AuditWindow


def make_window(**kwargs):
    defaults = dict(
        presenter="Anna Bauer",
        moderator="Oliver Hofmann",
        start_time=datetime(2026, 5, 16, 10, 3, 42),
        end_time=datetime(2026, 5, 16, 10, 47, 15),
        entries=[
            AuditEntry("Thomas Wolf", "t.wolf@company.com"),
            AuditEntry("Klaus Huber", "k.huber@company.com"),
            AuditEntry("Sandra Vogel", "s.vogel@company.com"),
        ],
    )
    defaults.update(kwargs)
    return AuditWindow(**defaults)


def test_memo_contains_presenter_name():
    result = generate_memo(make_window())
    assert "Anna Bauer" in result


def test_memo_contains_time_window():
    result = generate_memo(make_window())
    assert "10:03:42" in result
    assert "10:47:15" in result


def test_memo_sorts_entries_by_email():
    result = generate_memo(make_window())
    idx_huber = result.index("k.huber@company.com")
    idx_vogel = result.index("s.vogel@company.com")
    idx_wolf = result.index("t.wolf@company.com")
    assert idx_huber < idx_vogel < idx_wolf


def test_memo_excludes_presenter():
    window = make_window(
        entries=[
            AuditEntry("Anna Bauer", "a.bauer@company.com"),
            AuditEntry("Klaus Huber", "k.huber@company.com"),
        ]
    )
    result = generate_memo(window)
    assert "a.bauer@company.com" not in result
    assert "k.huber@company.com" in result


def test_memo_excludes_moderator():
    window = make_window(
        entries=[
            AuditEntry("Oliver Hofmann", "o.hofmann@company.com"),
            AuditEntry("Klaus Huber", "k.huber@company.com"),
        ]
    )
    result = generate_memo(window)
    assert "o.hofmann@company.com" not in result
    assert "k.huber@company.com" in result


def test_memo_empty_window():
    window = make_window(entries=[])
    result = generate_memo(window)
    assert "Keine Chat-Aktivität" in result


def test_memo_contains_moderator_note():
    result = generate_memo(make_window())
    assert "Oliver Hofmann" in result


def test_save_memo_writes_file(tmp_path):
    window = make_window()
    path = save_memo(generate_memo(window), directory=tmp_path, timestamp=datetime(2026, 5, 16, 10, 51))
    assert path.exists()
    assert path.name == "audit_20260516_1051.md"
    assert "Anna Bauer" in path.read_text()
  • Step 2: Run tests to verify they fail
.venv/bin/pytest tests/test_memo.py -v

Expected: ImportErrorteampulse.memo does not exist yet.

  • Step 3: Write memo.py

src/teampulse/memo.py:

from datetime import datetime
from pathlib import Path

from teampulse.models import AuditWindow


def generate_memo(window: AuditWindow) -> str:
    filtered = [
        e for e in window.entries
        if e.display_name != window.presenter and e.display_name != window.moderator
    ]
    sorted_entries = sorted(filtered, key=lambda e: e.email)

    start = window.start_time.strftime("%H:%M:%S")
    end = window.end_time.strftime("%H:%M:%S")

    lines = [
        "# Meeting Chat Audit",
        f"Zeitfenster: {start}  {end}",
        "",
    ]

    if not sorted_entries:
        lines += [
            f"## Teilnehmer am Vortrag von {window.presenter} (0)",
            f"_Moderator: {window.moderator} — Vortragender und Moderator ausgeschlossen_",
            "",
            "_Keine Chat-Aktivität in diesem Zeitfenster._",
        ]
    else:
        lines += [
            f"## Teilnehmer am Vortrag von {window.presenter} ({len(sorted_entries)})",
            f"_Moderator: {window.moderator} — Vortragender und Moderator ausgeschlossen_",
            "",
        ]
        for entry in sorted_entries:
            lines.append(f"- {entry.display_name} <{entry.email}>")

    created = datetime.now().strftime("%Y-%m-%d %H:%M")
    lines += ["", f"_Sortiert nach E-Mail-Adresse. Erstellt: {created}_"]
    return "\n".join(lines)


def save_memo(content: str, directory: Path = Path("."), timestamp: datetime | None = None) -> Path:
    ts = timestamp or datetime.now()
    filename = f"audit_{ts.strftime('%Y%m%d_%H%M')}.md"
    path = directory / filename
    path.write_text(content, encoding="utf-8")
    return path
  • Step 4: Run tests to verify they pass
.venv/bin/pytest tests/test_memo.py -v

Expected: All 8 tests PASS.

  • Step 5: Commit
git add src/teampulse/memo.py tests/test_memo.py
git commit -m "feat: memo generation with filtering and sorting"

Task 4: Resolver — Cache Layer

Files:

  • Create: src/teampulse/resolver.py

  • Create: tests/test_resolver_cache.py

  • Step 1: Write failing tests

tests/test_resolver_cache.py:

import json
from pathlib import Path
from unittest.mock import MagicMock

import pytest

from teampulse.resolver import Resolver


@pytest.fixture
def cache_path(tmp_path):
    return tmp_path / "cache.json"


def test_resolve_returns_email_from_cache(cache_path):
    cache_path.write_text(json.dumps({"Max Mustermann": "m.mustermann@company.com"}))
    resolver = Resolver(cache_path=cache_path, page=MagicMock())
    assert resolver.resolve("Max Mustermann") == "m.mustermann@company.com"


def test_resolve_unknown_name_calls_browser(cache_path):
    cache_path.write_text(json.dumps({}))
    mock_page = MagicMock()
    resolver = Resolver(cache_path=cache_path, page=mock_page)
    resolver._extract_email_from_profile = MagicMock(return_value="k.huber@company.com")

    result = resolver.resolve("Klaus Huber")

    resolver._extract_email_from_profile.assert_called_once_with("Klaus Huber")
    assert result == "k.huber@company.com"


def test_resolve_caches_newly_resolved_email(cache_path):
    cache_path.write_text(json.dumps({}))
    resolver = Resolver(cache_path=cache_path, page=MagicMock())
    resolver._extract_email_from_profile = MagicMock(return_value="k.huber@company.com")

    resolver.resolve("Klaus Huber")

    saved = json.loads(cache_path.read_text())
    assert saved["Klaus Huber"] == "k.huber@company.com"


def test_resolve_unresolvable_returns_placeholder(cache_path):
    cache_path.write_text(json.dumps({}))
    resolver = Resolver(cache_path=cache_path, page=MagicMock())
    resolver._extract_email_from_profile = MagicMock(return_value=None)

    result = resolver.resolve("Unknown Person")
    assert result == "<nicht auflösbar>"


def test_resolve_does_not_cache_unresolvable(cache_path):
    cache_path.write_text(json.dumps({}))
    resolver = Resolver(cache_path=cache_path, page=MagicMock())
    resolver._extract_email_from_profile = MagicMock(return_value=None)

    resolver.resolve("Unknown Person")

    saved = json.loads(cache_path.read_text())
    assert "Unknown Person" not in saved


def test_creates_cache_file_if_missing(tmp_path):
    cache_path = tmp_path / "nonexistent.json"
    resolver = Resolver(cache_path=cache_path, page=MagicMock())
    resolver._extract_email_from_profile = MagicMock(return_value="a@b.com")
    resolver.resolve("Test User")
    assert cache_path.exists()
  • Step 2: Run tests to verify they fail
.venv/bin/pytest tests/test_resolver_cache.py -v

Expected: ImportErrorteampulse.resolver does not exist.

  • Step 3: Write resolver.py (cache layer only)

src/teampulse/resolver.py:

import json
from pathlib import Path

from playwright.sync_api import Page


class Resolver:
    def __init__(self, cache_path: Path, page: Page):
        self._cache_path = cache_path
        self._page = page
        self._cache: dict[str, str] = self._load_cache()

    def resolve(self, display_name: str) -> str:
        if display_name in self._cache:
            return self._cache[display_name]

        email = self._extract_email_from_profile(display_name)
        if email is None:
            return "<nicht auflösbar>"

        self._cache[display_name] = email
        self._save_cache()
        return email

    def _load_cache(self) -> dict[str, str]:
        if not self._cache_path.exists():
            return {}
        return json.loads(self._cache_path.read_text(encoding="utf-8"))

    def _save_cache(self) -> None:
        self._cache_path.write_text(
            json.dumps(self._cache, ensure_ascii=False, indent=2),
            encoding="utf-8",
        )

    def _extract_email_from_profile(self, display_name: str) -> str | None:
        # Implemented in Task 8 — browser interaction with Teams profile cards
        raise NotImplementedError
  • Step 4: Run tests to verify they pass
.venv/bin/pytest tests/test_resolver_cache.py -v

Expected: All 6 tests PASS.

  • Step 5: Commit
git add src/teampulse/resolver.py tests/test_resolver_cache.py
git commit -m "feat: resolver cache layer"

Task 5: Monitor — Trigger Parsing

Files:

  • Modify: src/teampulse/monitor.py (create)

  • Create: tests/test_monitor_triggers.py

  • Step 1: Write failing tests

tests/test_monitor_triggers.py:

import re
from datetime import datetime

import pytest

from teampulse.models import ChatMessage
from teampulse.monitor import parse_trigger


def msg(sender: str, text: str) -> ChatMessage:
    return ChatMessage(sender=sender, text=text, timestamp=datetime(2026, 5, 16, 10, 0, 0))


CURRENT_USER = "Oliver Hofmann"


def test_start_with_quoted_name():
    result = parse_trigger(msg(CURRENT_USER, '!start "Anna Bauer"'), CURRENT_USER)
    assert result == ("start", "Anna Bauer")


def test_start_with_single_word_name():
    result = parse_trigger(msg(CURRENT_USER, '!start "Bauer"'), CURRENT_USER)
    assert result == ("start", "Bauer")


def test_start_without_name_returns_placeholder():
    result = parse_trigger(msg(CURRENT_USER, "!start"), CURRENT_USER)
    assert result == ("start", "Unbekannter Vortragender")


def test_start_with_empty_quotes_returns_placeholder():
    result = parse_trigger(msg(CURRENT_USER, '!start ""'), CURRENT_USER)
    assert result == ("start", "Unbekannter Vortragender")


def test_stop_trigger():
    result = parse_trigger(msg(CURRENT_USER, "!stop"), CURRENT_USER)
    assert result == ("stop", "")


def test_ignored_if_not_current_user():
    result = parse_trigger(msg("Klaus Huber", '!start "Anna Bauer"'), CURRENT_USER)
    assert result is None


def test_stop_ignored_if_not_current_user():
    result = parse_trigger(msg("Klaus Huber", "!stop"), CURRENT_USER)
    assert result is None


def test_regular_message_returns_none():
    result = parse_trigger(msg(CURRENT_USER, "Gute Frage!"), CURRENT_USER)
    assert result is None


def test_case_insensitive_start():
    result = parse_trigger(msg(CURRENT_USER, '!START "Anna Bauer"'), CURRENT_USER)
    assert result == ("start", "Anna Bauer")


def test_case_insensitive_stop():
    result = parse_trigger(msg(CURRENT_USER, "!STOP"), CURRENT_USER)
    assert result == ("stop", "")
  • Step 2: Run tests to verify they fail
.venv/bin/pytest tests/test_monitor_triggers.py -v

Expected: ImportErrorteampulse.monitor does not exist.

  • Step 3: Write monitor.py (trigger parsing only)

src/teampulse/monitor.py:

import re
from datetime import datetime

from playwright.sync_api import Page

from teampulse.models import AuditWindow, ChatMessage


_START_RE = re.compile(r'^!start(?:\s+"([^"]*)")?$', re.IGNORECASE)
_STOP_RE = re.compile(r'^!stop$', re.IGNORECASE)


def parse_trigger(message: ChatMessage, current_user: str) -> tuple[str, str] | None:
    if message.sender != current_user:
        return None

    text = message.text.strip()

    m = _START_RE.match(text)
    if m:
        name = (m.group(1) or "").strip() or "Unbekannter Vortragender"
        return ("start", name)

    if _STOP_RE.match(text):
        return ("stop", "")

    return None


class Monitor:
    """Chat polling loop — browser integration added in Task 7."""

    def __init__(self, page: Page, current_user: str):
        self._page = page
        self._current_user = current_user
  • Step 4: Run tests to verify they pass
.venv/bin/pytest tests/test_monitor_triggers.py -v

Expected: All 10 tests PASS.

  • Step 5: Run full test suite
.venv/bin/pytest -v

Expected: All tests PASS.

  • Step 6: Commit
git add src/teampulse/monitor.py tests/test_monitor_triggers.py
git commit -m "feat: trigger parsing for !start and !stop commands"

Task 6: DOM Discovery Script

Files:

  • Create: scripts/discover_dom.py

This task identifies the exact CSS selectors for Teams Web. Run it once with Teams open, inspect output, then update the constants used in Tasks 7 and 8.

  • Step 1: Create discovery script

scripts/discover_dom.py:

"""Run this script once with a Teams meeting chat open.
It prints DOM info to help identify selectors for messages and profile cards.
Usage: .venv/bin/python scripts/discover_dom.py <teams-chat-url>
"""
import sys
from playwright.sync_api import sync_playwright

URL = sys.argv[1] if len(sys.argv) > 1 else "https://teams.microsoft.com"
SESSION_DIR = str(__import__("pathlib").Path.home() / ".teampulse" / "session")


def main():
    with sync_playwright() as p:
        browser = p.chromium.launch_persistent_context(
            SESSION_DIR,
            headless=False,
            args=["--no-sandbox"],
        )
        page = browser.pages[0] if browser.pages else browser.new_page()
        page.goto(URL)
        print("Navigate to a meeting chat in the browser, then press Enter here...")
        input()

        # Print all elements that look like chat messages
        messages = page.query_selector_all("[data-tid*='message'], [class*='message'], [role='listitem']")
        print(f"\nFound {len(messages)} potential message elements")
        for i, el in enumerate(messages[:5]):
            print(f"\n--- Element {i} ---")
            print(f"  tag: {el.evaluate('e => e.tagName')}")
            print(f"  data-tid: {el.get_attribute('data-tid')}")
            print(f"  class (first 100): {(el.get_attribute('class') or '')[:100]}")
            print(f"  text (first 100): {el.inner_text()[:100]}")

        print("\nPress Enter to close...")
        input()
        browser.close()


if __name__ == "__main__":
    main()
  • Step 2: Run the discovery script
mkdir -p scripts
.venv/bin/python scripts/discover_dom.py

Navigate to a Teams meeting chat, press Enter. Note the data-tid values and class names that identify:

  1. Individual message containers
  2. Sender name elements within messages
  3. Profile card container (after clicking a sender name)
  4. Email field within the profile card
  • Step 3: Record findings

Open src/teampulse/monitor.py and add the discovered selectors as constants at the top:

# Discovered via scripts/discover_dom.py — update if Teams web changes
_MSG_SELECTOR = "[data-tid='chat-pane-message']"          # one chat message
_SENDER_SELECTOR = "[data-tid='message-author-name']"     # sender name within message
_PROFILE_EMAIL_SELECTOR = "[data-tid='persona-card-email']"  # email in profile card

Note: The selectors above are starting points based on known Teams web patterns. Adjust them based on what discover_dom.py shows. Common alternatives:

  • [class*='authorName'] for sender names
  • [class*='fui-Persona'] for profile cards
  • Step 4: Commit
git add scripts/discover_dom.py src/teampulse/monitor.py
git commit -m "feat: DOM discovery script and selector constants"

Task 7: Monitor — Chat Polling Loop

Files:

  • Modify: src/teampulse/monitor.py

  • Step 1: Add polling loop to Monitor class

Replace the Monitor class body in src/teampulse/monitor.py with:

class Monitor:
    def __init__(self, page: Page, current_user: str):
        self._page = page
        self._current_user = current_user
        self._seen_message_ids: set[str] = set()

    def wait_for_chat(self) -> None:
        """Block until a Teams chat page is detected."""
        print("Warte auf Teams-Chat-Seite... (bitte in der Browser-URL zur Meeting-Chat-Seite navigieren)")
        self._page.wait_for_selector(_MSG_SELECTOR, timeout=300_000)
        print("Chat erkannt.")

    def get_current_user_display_name(self) -> str:
        """Read the logged-in user's display name from the Teams web UI."""
        el = self._page.query_selector("[data-tid='me-control-display-name']")
        if el:
            return el.inner_text().strip()
        # Fallback: read from page title or profile area
        el = self._page.query_selector("[aria-label*='Profilbild']")
        if el:
            return (el.get_attribute("aria-label") or "").replace("Profilbild", "").strip()
        raise RuntimeError("Konnte den Namen des eingeloggten Nutzers nicht ermitteln.")

    def poll_new_messages(self) -> list[ChatMessage]:
        """Return messages not yet seen."""
        elements = self._page.query_selector_all(_MSG_SELECTOR)
        new_messages = []
        for el in elements:
            msg_id = el.get_attribute("data-tid") or el.get_attribute("id") or el.inner_text()[:40]
            if msg_id in self._seen_message_ids:
                continue
            self._seen_message_ids.add(msg_id)

            sender_el = el.query_selector(_SENDER_SELECTOR)
            if not sender_el:
                continue
            sender = sender_el.inner_text().strip()
            text = el.inner_text().strip()
            new_messages.append(ChatMessage(
                sender=sender,
                text=text,
                timestamp=datetime.now(),
            ))
        return new_messages

    def run(self) -> AuditWindow:
        """Main loop: collect messages until !start/!stop window is complete."""
        import time

        self.wait_for_chat()

        presenter: str | None = None
        start_time: datetime | None = None
        collected: list[ChatMessage] = []

        print(f"Monitoring aktiv. Poste '!start \"Name\"' im Chat um ein Zeitfenster zu starten.")

        while True:
            try:
                new_msgs = self.poll_new_messages()
            except Exception:
                print("Verbindung verloren, reconnecting...")
                time.sleep(2)
                continue

            for msg in new_msgs:
                trigger = parse_trigger(msg, self._current_user)

                if trigger is None:
                    if start_time is not None:
                        collected.append(msg)
                    continue

                action, value = trigger

                if action == "start":
                    presenter = value
                    start_time = datetime.now()
                    collected = []
                    print(f"Zeitfenster gestartet — Vortragender: {presenter}")

                elif action == "stop":
                    if start_time is None:
                        print("!stop ohne vorheriges !start — ignoriert.")
                        continue
                    end_time = datetime.now()
                    print(f"Zeitfenster beendet.")
                    from teampulse.models import AuditEntry
                    seen_senders = list(dict.fromkeys(m.sender for m in collected))
                    entries = [AuditEntry(display_name=s, email="") for s in seen_senders]
                    return AuditWindow(
                        presenter=presenter,
                        moderator=self._current_user,
                        start_time=start_time,
                        end_time=end_time,
                        entries=entries,
                    )

            time.sleep(2)
  • Step 2: Verify the module imports cleanly
.venv/bin/python -c "from teampulse.monitor import Monitor, parse_trigger; print('OK')"

Expected: OK

  • Step 3: Run full test suite
.venv/bin/pytest -v

Expected: All tests PASS (polling loop has no unit tests — integration tested in Task 9).

  • Step 4: Commit
git add src/teampulse/monitor.py
git commit -m "feat: chat polling loop and window collection"

Task 8: Resolver — Browser Email Extraction

Files:

  • Modify: src/teampulse/resolver.py

  • Step 1: Implement _extract_email_from_profile

Replace the _extract_email_from_profile stub in src/teampulse/resolver.py with:

def _extract_email_from_profile(self, display_name: str) -> str | None:
    import time

    # Find a message from this sender and click their name
    sender_elements = self._page.query_selector_all(
        f"{_SENDER_SELECTOR}"
    )
    target = None
    for el in sender_elements:
        if el.inner_text().strip() == display_name:
            target = el
            break

    if target is None:
        print(f"  Sender '{display_name}' nicht im Chat gefunden.")
        return None

    try:
        target.click()
        # Wait for profile card to appear
        self._page.wait_for_selector(_PROFILE_EMAIL_SELECTOR, timeout=5000)
        email_el = self._page.query_selector(_PROFILE_EMAIL_SELECTOR)
        email = email_el.inner_text().strip() if email_el else None

        # Close profile card by pressing Escape
        self._page.keyboard.press("Escape")
        time.sleep(0.5)
        return email
    except Exception as e:
        print(f"  Profilkarte für '{display_name}' nicht ladbar: {e}")
        self._page.keyboard.press("Escape")
        return None

Also add the selector constant import at the top of resolver.py:

from teampulse.monitor import _PROFILE_EMAIL_SELECTOR, _SENDER_SELECTOR
  • Step 2: Run the cache tests to ensure nothing broke
.venv/bin/pytest tests/test_resolver_cache.py -v

Expected: All 6 tests PASS.

  • Step 3: Commit
git add src/teampulse/resolver.py
git commit -m "feat: browser-based email extraction from Teams profile cards"

Task 9: Auth — Session Management

Files:

  • Create: src/teampulse/auth.py

  • Step 1: Write auth.py

src/teampulse/auth.py:

from pathlib import Path

from playwright.sync_api import BrowserContext, Playwright

SESSION_DIR = Path.home() / ".teampulse" / "session"


def create_context(playwright: Playwright, headless: bool = True) -> BrowserContext:
    SESSION_DIR.mkdir(parents=True, exist_ok=True)
    context = playwright.chromium.launch_persistent_context(
        str(SESSION_DIR),
        headless=headless,
        args=["--no-sandbox"],
        locale="de-DE",
    )
    return context


def ensure_logged_in(context: BrowserContext) -> None:
    """
    Open Teams web. If redirected to a login page, make the browser
    visible so the user can authenticate. Waits until login is complete.
    """
    page = context.pages[0] if context.pages else context.new_page()
    page.goto("https://teams.microsoft.com")

    if _is_login_page(page):
        print("Bitte im Browser-Fenster anmelden (SSO/MFA)...")
        # Make browser visible for login
        page.evaluate("document.title = 'TeamPulse Login'")
        page.wait_for_url("**/teams.microsoft.com/**", timeout=120_000)
        print("Anmeldung erfolgreich.")

    page.close()


def _is_login_page(page) -> bool:
    try:
        page.wait_for_url("**/login**", timeout=3000)
        return True
    except Exception:
        return False
  • Step 2: Verify import
.venv/bin/python -c "from teampulse.auth import create_context, ensure_logged_in; print('OK')"

Expected: OK

  • Step 3: Commit
git add src/teampulse/auth.py
git commit -m "feat: Playwright session management with login detection"

Task 10: Main Entry Point

Files:

  • Create: main.py

  • Step 1: Write main.py

#!/usr/bin/env python3
"""TeamPulse — Teams meeting chat audit tool.

Usage:
    .venv/bin/python main.py

The script opens a background browser, navigates to Teams.
Post !start "Presenter Name" and !stop in the meeting chat to define a time window.
A Markdown memo is saved to the current directory when the window closes.
"""
from pathlib import Path

from playwright.sync_api import sync_playwright

from teampulse.auth import create_context, ensure_logged_in
from teampulse.memo import generate_memo, save_memo
from teampulse.models import AuditEntry
from teampulse.monitor import Monitor
from teampulse.resolver import Resolver

CACHE_PATH = Path.home() / ".teampulse" / "cache.json"


def main():
    with sync_playwright() as playwright:
        print("Starte Browser...")
        context = create_context(playwright, headless=False)
        ensure_logged_in(context)

        page = context.new_page()
        page.goto("https://teams.microsoft.com")

        monitor = Monitor(page=page, current_user="")
        print("Lese eingeloggten Nutzer...")
        current_user = monitor.get_current_user_display_name()
        monitor._current_user = current_user
        print(f"Eingeloggt als: {current_user}")

        print("\nNavigiere im Browser zur Meeting-Chat-Seite.")
        print("Poste dann '!start \"Name des Vortragenden\"' im Chat.\n")

        window = monitor.run()

        resolver = Resolver(cache_path=CACHE_PATH, page=page)
        print(f"\nLöse {len(window.entries)} E-Mail-Adressen auf...")
        resolved_entries = []
        for entry in window.entries:
            email = resolver.resolve(entry.display_name)
            print(f"  {entry.display_name}{email}")
            resolved_entries.append(AuditEntry(display_name=entry.display_name, email=email))

        window.entries = resolved_entries

        memo_content = generate_memo(window)
        path = save_memo(memo_content)

        print(f"\nMemo gespeichert: {path}")
        print("\n" + memo_content)

        context.close()


if __name__ == "__main__":
    main()
  • Step 2: Run full test suite one last time
.venv/bin/pytest -v

Expected: All tests PASS.

  • Step 3: Smoke test (optional, requires active Teams session)
.venv/bin/python main.py

Expected: Browser opens, navigates to Teams. After login, monitoring starts.

  • Step 4: Commit
git add main.py
git commit -m "feat: main entry point wiring all components"

Self-Review Notes

  • Spec coverage: All spec requirements are covered: Playwright + Teams web (✓), persistent session (✓), !start "Name" / !stop triggers (✓), console Enter fallback — gap: console Enter fallback is mentioned in spec but not implemented. It can be added later as a threading enhancement to monitor.run() without changing the architecture.
  • Selector constants: Defined in monitor.py and imported by resolver.py — consistent across both files.
  • AuditEntry.email: Initialized as "" in monitor.py Task 7 and filled in main.py after resolution — intentional two-phase design.
  • _PROFILE_EMAIL_SELECTOR and _SENDER_SELECTOR: Exported from monitor.py and imported in resolver.py — single source of truth for DOM selectors.