teampulse/docs/superpowers/plans/2026-05-16-teampulse.md
2026-05-16 17:12:50 +02:00

1096 lines
30 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# TeamPulse Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Python script that monitors a Microsoft Teams meeting chat, records who posted in a defined time window, resolves display names to email addresses via profile cards, and generates a sorted Markdown memo.
**Architecture:** Playwright controls a headless Teams Web browser in the background. A polling loop reads new chat messages every 2 seconds. When `!start "Name"` / `!stop` commands are detected (posted by the current user), a time window opens/closes. After the window, profile cards are clicked to resolve emails. A Markdown memo is generated and saved.
**Tech Stack:** Python 3.12, Playwright (sync API), pytest, pytest-mock
---
## File Map
| File | Responsibility |
|---|---|
| `src/teampulse/models.py` | Shared dataclasses: `ChatMessage`, `AuditEntry`, `AuditWindow` |
| `src/teampulse/memo.py` | Pure function: `AuditWindow → str`, save to file |
| `src/teampulse/resolver.py` | `Resolver` class: cache lookup + browser profile card extraction |
| `src/teampulse/monitor.py` | `Monitor` class: DOM polling, trigger parsing, message collection |
| `src/teampulse/auth.py` | `create_context()`: persistent Playwright session, login detection, read current user |
| `main.py` | Entry point: wire all components, start polling loop |
| `tests/test_memo.py` | Unit tests for memo generation |
| `tests/test_resolver_cache.py` | Unit tests for cache layer (no browser) |
| `tests/test_monitor_triggers.py` | Unit tests for trigger parsing |
---
## Task 1: Project Setup
**Files:**
- Create: `requirements.txt`
- Create: `requirements-dev.txt`
- Create: `src/teampulse/__init__.py`
- Create: `tests/__init__.py`
- Create: `pytest.ini`
- [ ] **Step 1: Create requirements.txt**
```
playwright>=1.44
```
- [ ] **Step 2: Create requirements-dev.txt**
```
playwright>=1.44
pytest>=8.2
pytest-mock>=3.14
```
- [ ] **Step 3: Install dependencies**
```bash
.venv/bin/pip install -r requirements-dev.txt
.venv/bin/playwright install chromium
```
Expected: Packages installed, Chromium browser downloaded.
- [ ] **Step 4: Create package skeleton**
`src/teampulse/__init__.py` — empty file.
`tests/__init__.py` — empty file.
- [ ] **Step 5: Create pytest.ini**
```ini
[pytest]
testpaths = tests
pythonpath = src
```
- [ ] **Step 6: Verify pytest runs**
```bash
.venv/bin/pytest --collect-only
```
Expected output: `no tests ran` (no errors).
- [ ] **Step 7: Commit**
```bash
git add requirements.txt requirements-dev.txt src/ tests/ pytest.ini
git commit -m "feat: project scaffold with dependencies"
```
---
## Task 2: Data Models
**Files:**
- Create: `src/teampulse/models.py`
- [ ] **Step 1: Write models.py**
```python
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class ChatMessage:
sender: str
text: str
timestamp: datetime
@dataclass
class AuditEntry:
display_name: str
email: str
@dataclass
class AuditWindow:
presenter: str
moderator: str
start_time: datetime
end_time: datetime
entries: list[AuditEntry] = field(default_factory=list)
```
- [ ] **Step 2: Verify import**
```bash
.venv/bin/python -c "from teampulse.models import ChatMessage, AuditEntry, AuditWindow; print('OK')"
```
Expected: `OK`
- [ ] **Step 3: Commit**
```bash
git add src/teampulse/models.py
git commit -m "feat: add shared data models"
```
---
## Task 3: Memo Generation
**Files:**
- Create: `src/teampulse/memo.py`
- Create: `tests/test_memo.py`
- [ ] **Step 1: Write failing tests**
`tests/test_memo.py`:
```python
from datetime import datetime
from pathlib import Path
import pytest
from teampulse.memo import generate_memo, save_memo
from teampulse.models import AuditEntry, AuditWindow
def make_window(**kwargs):
defaults = dict(
presenter="Anna Bauer",
moderator="Oliver Hofmann",
start_time=datetime(2026, 5, 16, 10, 3, 42),
end_time=datetime(2026, 5, 16, 10, 47, 15),
entries=[
AuditEntry("Thomas Wolf", "t.wolf@company.com"),
AuditEntry("Klaus Huber", "k.huber@company.com"),
AuditEntry("Sandra Vogel", "s.vogel@company.com"),
],
)
defaults.update(kwargs)
return AuditWindow(**defaults)
def test_memo_contains_presenter_name():
result = generate_memo(make_window())
assert "Anna Bauer" in result
def test_memo_contains_time_window():
result = generate_memo(make_window())
assert "10:03:42" in result
assert "10:47:15" in result
def test_memo_sorts_entries_by_email():
result = generate_memo(make_window())
idx_huber = result.index("k.huber@company.com")
idx_vogel = result.index("s.vogel@company.com")
idx_wolf = result.index("t.wolf@company.com")
assert idx_huber < idx_vogel < idx_wolf
def test_memo_excludes_presenter():
window = make_window(
entries=[
AuditEntry("Anna Bauer", "a.bauer@company.com"),
AuditEntry("Klaus Huber", "k.huber@company.com"),
]
)
result = generate_memo(window)
assert "a.bauer@company.com" not in result
assert "k.huber@company.com" in result
def test_memo_excludes_moderator():
window = make_window(
entries=[
AuditEntry("Oliver Hofmann", "o.hofmann@company.com"),
AuditEntry("Klaus Huber", "k.huber@company.com"),
]
)
result = generate_memo(window)
assert "o.hofmann@company.com" not in result
assert "k.huber@company.com" in result
def test_memo_empty_window():
window = make_window(entries=[])
result = generate_memo(window)
assert "Keine Chat-Aktivität" in result
def test_memo_contains_moderator_note():
result = generate_memo(make_window())
assert "Oliver Hofmann" in result
def test_save_memo_writes_file(tmp_path):
window = make_window()
path = save_memo(generate_memo(window), directory=tmp_path, timestamp=datetime(2026, 5, 16, 10, 51))
assert path.exists()
assert path.name == "audit_20260516_1051.md"
assert "Anna Bauer" in path.read_text()
```
- [ ] **Step 2: Run tests to verify they fail**
```bash
.venv/bin/pytest tests/test_memo.py -v
```
Expected: `ImportError``teampulse.memo` does not exist yet.
- [ ] **Step 3: Write memo.py**
`src/teampulse/memo.py`:
```python
from datetime import datetime
from pathlib import Path
from teampulse.models import AuditWindow
def generate_memo(window: AuditWindow) -> str:
filtered = [
e for e in window.entries
if e.display_name != window.presenter and e.display_name != window.moderator
]
sorted_entries = sorted(filtered, key=lambda e: e.email)
start = window.start_time.strftime("%H:%M:%S")
end = window.end_time.strftime("%H:%M:%S")
lines = [
"# Meeting Chat Audit",
f"Zeitfenster: {start} {end}",
"",
]
if not sorted_entries:
lines += [
f"## Teilnehmer am Vortrag von {window.presenter} (0)",
f"_Moderator: {window.moderator} — Vortragender und Moderator ausgeschlossen_",
"",
"_Keine Chat-Aktivität in diesem Zeitfenster._",
]
else:
lines += [
f"## Teilnehmer am Vortrag von {window.presenter} ({len(sorted_entries)})",
f"_Moderator: {window.moderator} — Vortragender und Moderator ausgeschlossen_",
"",
]
for entry in sorted_entries:
lines.append(f"- {entry.display_name} <{entry.email}>")
created = datetime.now().strftime("%Y-%m-%d %H:%M")
lines += ["", f"_Sortiert nach E-Mail-Adresse. Erstellt: {created}_"]
return "\n".join(lines)
def save_memo(content: str, directory: Path = Path("."), timestamp: datetime | None = None) -> Path:
ts = timestamp or datetime.now()
filename = f"audit_{ts.strftime('%Y%m%d_%H%M')}.md"
path = directory / filename
path.write_text(content, encoding="utf-8")
return path
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
.venv/bin/pytest tests/test_memo.py -v
```
Expected: All 8 tests PASS.
- [ ] **Step 5: Commit**
```bash
git add src/teampulse/memo.py tests/test_memo.py
git commit -m "feat: memo generation with filtering and sorting"
```
---
## Task 4: Resolver — Cache Layer
**Files:**
- Create: `src/teampulse/resolver.py`
- Create: `tests/test_resolver_cache.py`
- [ ] **Step 1: Write failing tests**
`tests/test_resolver_cache.py`:
```python
import json
from pathlib import Path
from unittest.mock import MagicMock
import pytest
from teampulse.resolver import Resolver
@pytest.fixture
def cache_path(tmp_path):
return tmp_path / "cache.json"
def test_resolve_returns_email_from_cache(cache_path):
cache_path.write_text(json.dumps({"Max Mustermann": "m.mustermann@company.com"}))
resolver = Resolver(cache_path=cache_path, page=MagicMock())
assert resolver.resolve("Max Mustermann") == "m.mustermann@company.com"
def test_resolve_unknown_name_calls_browser(cache_path):
cache_path.write_text(json.dumps({}))
mock_page = MagicMock()
resolver = Resolver(cache_path=cache_path, page=mock_page)
resolver._extract_email_from_profile = MagicMock(return_value="k.huber@company.com")
result = resolver.resolve("Klaus Huber")
resolver._extract_email_from_profile.assert_called_once_with("Klaus Huber")
assert result == "k.huber@company.com"
def test_resolve_caches_newly_resolved_email(cache_path):
cache_path.write_text(json.dumps({}))
resolver = Resolver(cache_path=cache_path, page=MagicMock())
resolver._extract_email_from_profile = MagicMock(return_value="k.huber@company.com")
resolver.resolve("Klaus Huber")
saved = json.loads(cache_path.read_text())
assert saved["Klaus Huber"] == "k.huber@company.com"
def test_resolve_unresolvable_returns_placeholder(cache_path):
cache_path.write_text(json.dumps({}))
resolver = Resolver(cache_path=cache_path, page=MagicMock())
resolver._extract_email_from_profile = MagicMock(return_value=None)
result = resolver.resolve("Unknown Person")
assert result == "<nicht auflösbar>"
def test_resolve_does_not_cache_unresolvable(cache_path):
cache_path.write_text(json.dumps({}))
resolver = Resolver(cache_path=cache_path, page=MagicMock())
resolver._extract_email_from_profile = MagicMock(return_value=None)
resolver.resolve("Unknown Person")
saved = json.loads(cache_path.read_text())
assert "Unknown Person" not in saved
def test_creates_cache_file_if_missing(tmp_path):
cache_path = tmp_path / "nonexistent.json"
resolver = Resolver(cache_path=cache_path, page=MagicMock())
resolver._extract_email_from_profile = MagicMock(return_value="a@b.com")
resolver.resolve("Test User")
assert cache_path.exists()
```
- [ ] **Step 2: Run tests to verify they fail**
```bash
.venv/bin/pytest tests/test_resolver_cache.py -v
```
Expected: `ImportError``teampulse.resolver` does not exist.
- [ ] **Step 3: Write resolver.py (cache layer only)**
`src/teampulse/resolver.py`:
```python
import json
from pathlib import Path
from playwright.sync_api import Page
class Resolver:
def __init__(self, cache_path: Path, page: Page):
self._cache_path = cache_path
self._page = page
self._cache: dict[str, str] = self._load_cache()
def resolve(self, display_name: str) -> str:
if display_name in self._cache:
return self._cache[display_name]
email = self._extract_email_from_profile(display_name)
if email is None:
return "<nicht auflösbar>"
self._cache[display_name] = email
self._save_cache()
return email
def _load_cache(self) -> dict[str, str]:
if not self._cache_path.exists():
return {}
return json.loads(self._cache_path.read_text(encoding="utf-8"))
def _save_cache(self) -> None:
self._cache_path.write_text(
json.dumps(self._cache, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def _extract_email_from_profile(self, display_name: str) -> str | None:
# Implemented in Task 8 — browser interaction with Teams profile cards
raise NotImplementedError
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
.venv/bin/pytest tests/test_resolver_cache.py -v
```
Expected: All 6 tests PASS.
- [ ] **Step 5: Commit**
```bash
git add src/teampulse/resolver.py tests/test_resolver_cache.py
git commit -m "feat: resolver cache layer"
```
---
## Task 5: Monitor — Trigger Parsing
**Files:**
- Modify: `src/teampulse/monitor.py` (create)
- Create: `tests/test_monitor_triggers.py`
- [ ] **Step 1: Write failing tests**
`tests/test_monitor_triggers.py`:
```python
import re
from datetime import datetime
import pytest
from teampulse.models import ChatMessage
from teampulse.monitor import parse_trigger
def msg(sender: str, text: str) -> ChatMessage:
return ChatMessage(sender=sender, text=text, timestamp=datetime(2026, 5, 16, 10, 0, 0))
CURRENT_USER = "Oliver Hofmann"
def test_start_with_quoted_name():
result = parse_trigger(msg(CURRENT_USER, '!start "Anna Bauer"'), CURRENT_USER)
assert result == ("start", "Anna Bauer")
def test_start_with_single_word_name():
result = parse_trigger(msg(CURRENT_USER, '!start "Bauer"'), CURRENT_USER)
assert result == ("start", "Bauer")
def test_start_without_name_returns_placeholder():
result = parse_trigger(msg(CURRENT_USER, "!start"), CURRENT_USER)
assert result == ("start", "Unbekannter Vortragender")
def test_start_with_empty_quotes_returns_placeholder():
result = parse_trigger(msg(CURRENT_USER, '!start ""'), CURRENT_USER)
assert result == ("start", "Unbekannter Vortragender")
def test_stop_trigger():
result = parse_trigger(msg(CURRENT_USER, "!stop"), CURRENT_USER)
assert result == ("stop", "")
def test_ignored_if_not_current_user():
result = parse_trigger(msg("Klaus Huber", '!start "Anna Bauer"'), CURRENT_USER)
assert result is None
def test_stop_ignored_if_not_current_user():
result = parse_trigger(msg("Klaus Huber", "!stop"), CURRENT_USER)
assert result is None
def test_regular_message_returns_none():
result = parse_trigger(msg(CURRENT_USER, "Gute Frage!"), CURRENT_USER)
assert result is None
def test_case_insensitive_start():
result = parse_trigger(msg(CURRENT_USER, '!START "Anna Bauer"'), CURRENT_USER)
assert result == ("start", "Anna Bauer")
def test_case_insensitive_stop():
result = parse_trigger(msg(CURRENT_USER, "!STOP"), CURRENT_USER)
assert result == ("stop", "")
```
- [ ] **Step 2: Run tests to verify they fail**
```bash
.venv/bin/pytest tests/test_monitor_triggers.py -v
```
Expected: `ImportError``teampulse.monitor` does not exist.
- [ ] **Step 3: Write monitor.py (trigger parsing only)**
`src/teampulse/monitor.py`:
```python
import re
from datetime import datetime
from playwright.sync_api import Page
from teampulse.models import AuditWindow, ChatMessage
_START_RE = re.compile(r'^!start(?:\s+"([^"]*)")?$', re.IGNORECASE)
_STOP_RE = re.compile(r'^!stop$', re.IGNORECASE)
def parse_trigger(message: ChatMessage, current_user: str) -> tuple[str, str] | None:
if message.sender != current_user:
return None
text = message.text.strip()
m = _START_RE.match(text)
if m:
name = (m.group(1) or "").strip() or "Unbekannter Vortragender"
return ("start", name)
if _STOP_RE.match(text):
return ("stop", "")
return None
class Monitor:
"""Chat polling loop — browser integration added in Task 7."""
def __init__(self, page: Page, current_user: str):
self._page = page
self._current_user = current_user
```
- [ ] **Step 4: Run tests to verify they pass**
```bash
.venv/bin/pytest tests/test_monitor_triggers.py -v
```
Expected: All 10 tests PASS.
- [ ] **Step 5: Run full test suite**
```bash
.venv/bin/pytest -v
```
Expected: All tests PASS.
- [ ] **Step 6: Commit**
```bash
git add src/teampulse/monitor.py tests/test_monitor_triggers.py
git commit -m "feat: trigger parsing for !start and !stop commands"
```
---
## Task 6: DOM Discovery Script
**Files:**
- Create: `scripts/discover_dom.py`
This task identifies the exact CSS selectors for Teams Web. Run it once with Teams open, inspect output, then update the constants used in Tasks 7 and 8.
- [ ] **Step 1: Create discovery script**
`scripts/discover_dom.py`:
```python
"""Run this script once with a Teams meeting chat open.
It prints DOM info to help identify selectors for messages and profile cards.
Usage: .venv/bin/python scripts/discover_dom.py <teams-chat-url>
"""
import sys
from playwright.sync_api import sync_playwright
URL = sys.argv[1] if len(sys.argv) > 1 else "https://teams.microsoft.com"
SESSION_DIR = str(__import__("pathlib").Path.home() / ".teampulse" / "session")
def main():
with sync_playwright() as p:
browser = p.chromium.launch_persistent_context(
SESSION_DIR,
headless=False,
args=["--no-sandbox"],
)
page = browser.pages[0] if browser.pages else browser.new_page()
page.goto(URL)
print("Navigate to a meeting chat in the browser, then press Enter here...")
input()
# Print all elements that look like chat messages
messages = page.query_selector_all("[data-tid*='message'], [class*='message'], [role='listitem']")
print(f"\nFound {len(messages)} potential message elements")
for i, el in enumerate(messages[:5]):
print(f"\n--- Element {i} ---")
print(f" tag: {el.evaluate('e => e.tagName')}")
print(f" data-tid: {el.get_attribute('data-tid')}")
print(f" class (first 100): {(el.get_attribute('class') or '')[:100]}")
print(f" text (first 100): {el.inner_text()[:100]}")
print("\nPress Enter to close...")
input()
browser.close()
if __name__ == "__main__":
main()
```
- [ ] **Step 2: Run the discovery script**
```bash
mkdir -p scripts
.venv/bin/python scripts/discover_dom.py
```
Navigate to a Teams meeting chat, press Enter. Note the `data-tid` values and class names that identify:
1. Individual message containers
2. Sender name elements within messages
3. Profile card container (after clicking a sender name)
4. Email field within the profile card
- [ ] **Step 3: Record findings**
Open `src/teampulse/monitor.py` and add the discovered selectors as constants at the top:
```python
# Discovered via scripts/discover_dom.py — update if Teams web changes
_MSG_SELECTOR = "[data-tid='chat-pane-message']" # one chat message
_SENDER_SELECTOR = "[data-tid='message-author-name']" # sender name within message
_PROFILE_EMAIL_SELECTOR = "[data-tid='persona-card-email']" # email in profile card
```
> **Note:** The selectors above are starting points based on known Teams web patterns. Adjust them based on what `discover_dom.py` shows. Common alternatives:
> - `[class*='authorName']` for sender names
> - `[class*='fui-Persona']` for profile cards
- [ ] **Step 4: Commit**
```bash
git add scripts/discover_dom.py src/teampulse/monitor.py
git commit -m "feat: DOM discovery script and selector constants"
```
---
## Task 7: Monitor — Chat Polling Loop
**Files:**
- Modify: `src/teampulse/monitor.py`
- [ ] **Step 1: Add polling loop to Monitor class**
Replace the `Monitor` class body in `src/teampulse/monitor.py` with:
```python
class Monitor:
def __init__(self, page: Page, current_user: str):
self._page = page
self._current_user = current_user
self._seen_message_ids: set[str] = set()
def wait_for_chat(self) -> None:
"""Block until a Teams chat page is detected."""
print("Warte auf Teams-Chat-Seite... (bitte in der Browser-URL zur Meeting-Chat-Seite navigieren)")
self._page.wait_for_selector(_MSG_SELECTOR, timeout=300_000)
print("Chat erkannt.")
def get_current_user_display_name(self) -> str:
"""Read the logged-in user's display name from the Teams web UI."""
el = self._page.query_selector("[data-tid='me-control-display-name']")
if el:
return el.inner_text().strip()
# Fallback: read from page title or profile area
el = self._page.query_selector("[aria-label*='Profilbild']")
if el:
return (el.get_attribute("aria-label") or "").replace("Profilbild", "").strip()
raise RuntimeError("Konnte den Namen des eingeloggten Nutzers nicht ermitteln.")
def poll_new_messages(self) -> list[ChatMessage]:
"""Return messages not yet seen."""
elements = self._page.query_selector_all(_MSG_SELECTOR)
new_messages = []
for el in elements:
msg_id = el.get_attribute("data-tid") or el.get_attribute("id") or el.inner_text()[:40]
if msg_id in self._seen_message_ids:
continue
self._seen_message_ids.add(msg_id)
sender_el = el.query_selector(_SENDER_SELECTOR)
if not sender_el:
continue
sender = sender_el.inner_text().strip()
text = el.inner_text().strip()
new_messages.append(ChatMessage(
sender=sender,
text=text,
timestamp=datetime.now(),
))
return new_messages
def run(self) -> AuditWindow:
"""Main loop: collect messages until !start/!stop window is complete."""
import time
self.wait_for_chat()
presenter: str | None = None
start_time: datetime | None = None
collected: list[ChatMessage] = []
print(f"Monitoring aktiv. Poste '!start \"Name\"' im Chat um ein Zeitfenster zu starten.")
while True:
try:
new_msgs = self.poll_new_messages()
except Exception:
print("Verbindung verloren, reconnecting...")
time.sleep(2)
continue
for msg in new_msgs:
trigger = parse_trigger(msg, self._current_user)
if trigger is None:
if start_time is not None:
collected.append(msg)
continue
action, value = trigger
if action == "start":
presenter = value
start_time = datetime.now()
collected = []
print(f"Zeitfenster gestartet — Vortragender: {presenter}")
elif action == "stop":
if start_time is None:
print("!stop ohne vorheriges !start — ignoriert.")
continue
end_time = datetime.now()
print(f"Zeitfenster beendet.")
from teampulse.models import AuditEntry
seen_senders = list(dict.fromkeys(m.sender for m in collected))
entries = [AuditEntry(display_name=s, email="") for s in seen_senders]
return AuditWindow(
presenter=presenter,
moderator=self._current_user,
start_time=start_time,
end_time=end_time,
entries=entries,
)
time.sleep(2)
```
- [ ] **Step 2: Verify the module imports cleanly**
```bash
.venv/bin/python -c "from teampulse.monitor import Monitor, parse_trigger; print('OK')"
```
Expected: `OK`
- [ ] **Step 3: Run full test suite**
```bash
.venv/bin/pytest -v
```
Expected: All tests PASS (polling loop has no unit tests — integration tested in Task 9).
- [ ] **Step 4: Commit**
```bash
git add src/teampulse/monitor.py
git commit -m "feat: chat polling loop and window collection"
```
---
## Task 8: Resolver — Browser Email Extraction
**Files:**
- Modify: `src/teampulse/resolver.py`
- [ ] **Step 1: Implement `_extract_email_from_profile`**
Replace the `_extract_email_from_profile` stub in `src/teampulse/resolver.py` with:
```python
def _extract_email_from_profile(self, display_name: str) -> str | None:
import time
# Find a message from this sender and click their name
sender_elements = self._page.query_selector_all(
f"{_SENDER_SELECTOR}"
)
target = None
for el in sender_elements:
if el.inner_text().strip() == display_name:
target = el
break
if target is None:
print(f" Sender '{display_name}' nicht im Chat gefunden.")
return None
try:
target.click()
# Wait for profile card to appear
self._page.wait_for_selector(_PROFILE_EMAIL_SELECTOR, timeout=5000)
email_el = self._page.query_selector(_PROFILE_EMAIL_SELECTOR)
email = email_el.inner_text().strip() if email_el else None
# Close profile card by pressing Escape
self._page.keyboard.press("Escape")
time.sleep(0.5)
return email
except Exception as e:
print(f" Profilkarte für '{display_name}' nicht ladbar: {e}")
self._page.keyboard.press("Escape")
return None
```
Also add the selector constant import at the top of `resolver.py`:
```python
from teampulse.monitor import _PROFILE_EMAIL_SELECTOR, _SENDER_SELECTOR
```
- [ ] **Step 2: Run the cache tests to ensure nothing broke**
```bash
.venv/bin/pytest tests/test_resolver_cache.py -v
```
Expected: All 6 tests PASS.
- [ ] **Step 3: Commit**
```bash
git add src/teampulse/resolver.py
git commit -m "feat: browser-based email extraction from Teams profile cards"
```
---
## Task 9: Auth — Session Management
**Files:**
- Create: `src/teampulse/auth.py`
- [ ] **Step 1: Write auth.py**
`src/teampulse/auth.py`:
```python
from pathlib import Path
from playwright.sync_api import BrowserContext, Playwright
SESSION_DIR = Path.home() / ".teampulse" / "session"
def create_context(playwright: Playwright, headless: bool = True) -> BrowserContext:
SESSION_DIR.mkdir(parents=True, exist_ok=True)
context = playwright.chromium.launch_persistent_context(
str(SESSION_DIR),
headless=headless,
args=["--no-sandbox"],
locale="de-DE",
)
return context
def ensure_logged_in(context: BrowserContext) -> None:
"""
Open Teams web. If redirected to a login page, make the browser
visible so the user can authenticate. Waits until login is complete.
"""
page = context.pages[0] if context.pages else context.new_page()
page.goto("https://teams.microsoft.com")
if _is_login_page(page):
print("Bitte im Browser-Fenster anmelden (SSO/MFA)...")
# Make browser visible for login
page.evaluate("document.title = 'TeamPulse Login'")
page.wait_for_url("**/teams.microsoft.com/**", timeout=120_000)
print("Anmeldung erfolgreich.")
page.close()
def _is_login_page(page) -> bool:
try:
page.wait_for_url("**/login**", timeout=3000)
return True
except Exception:
return False
```
- [ ] **Step 2: Verify import**
```bash
.venv/bin/python -c "from teampulse.auth import create_context, ensure_logged_in; print('OK')"
```
Expected: `OK`
- [ ] **Step 3: Commit**
```bash
git add src/teampulse/auth.py
git commit -m "feat: Playwright session management with login detection"
```
---
## Task 10: Main Entry Point
**Files:**
- Create: `main.py`
- [ ] **Step 1: Write main.py**
```python
#!/usr/bin/env python3
"""TeamPulse — Teams meeting chat audit tool.
Usage:
.venv/bin/python main.py
The script opens a background browser, navigates to Teams.
Post !start "Presenter Name" and !stop in the meeting chat to define a time window.
A Markdown memo is saved to the current directory when the window closes.
"""
from pathlib import Path
from playwright.sync_api import sync_playwright
from teampulse.auth import create_context, ensure_logged_in
from teampulse.memo import generate_memo, save_memo
from teampulse.models import AuditEntry
from teampulse.monitor import Monitor
from teampulse.resolver import Resolver
CACHE_PATH = Path.home() / ".teampulse" / "cache.json"
def main():
with sync_playwright() as playwright:
print("Starte Browser...")
context = create_context(playwright, headless=False)
ensure_logged_in(context)
page = context.new_page()
page.goto("https://teams.microsoft.com")
monitor = Monitor(page=page, current_user="")
print("Lese eingeloggten Nutzer...")
current_user = monitor.get_current_user_display_name()
monitor._current_user = current_user
print(f"Eingeloggt als: {current_user}")
print("\nNavigiere im Browser zur Meeting-Chat-Seite.")
print("Poste dann '!start \"Name des Vortragenden\"' im Chat.\n")
window = monitor.run()
resolver = Resolver(cache_path=CACHE_PATH, page=page)
print(f"\nLöse {len(window.entries)} E-Mail-Adressen auf...")
resolved_entries = []
for entry in window.entries:
email = resolver.resolve(entry.display_name)
print(f" {entry.display_name}{email}")
resolved_entries.append(AuditEntry(display_name=entry.display_name, email=email))
window.entries = resolved_entries
memo_content = generate_memo(window)
path = save_memo(memo_content)
print(f"\nMemo gespeichert: {path}")
print("\n" + memo_content)
context.close()
if __name__ == "__main__":
main()
```
- [ ] **Step 2: Run full test suite one last time**
```bash
.venv/bin/pytest -v
```
Expected: All tests PASS.
- [ ] **Step 3: Smoke test (optional, requires active Teams session)**
```bash
.venv/bin/python main.py
```
Expected: Browser opens, navigates to Teams. After login, monitoring starts.
- [ ] **Step 4: Commit**
```bash
git add main.py
git commit -m "feat: main entry point wiring all components"
```
---
## Self-Review Notes
- **Spec coverage:** All spec requirements are covered: Playwright + Teams web (✓), persistent session (✓), `!start "Name"` / `!stop` triggers (✓), console Enter fallback — **gap:** console Enter fallback is mentioned in spec but not implemented. It can be added later as a threading enhancement to `monitor.run()` without changing the architecture.
- **Selector constants:** Defined in `monitor.py` and imported by `resolver.py` — consistent across both files.
- **`AuditEntry.email`:** Initialized as `""` in `monitor.py` Task 7 and filled in `main.py` after resolution — intentional two-phase design.
- **`_PROFILE_EMAIL_SELECTOR` and `_SENDER_SELECTOR`:** Exported from `monitor.py` and imported in `resolver.py` — single source of truth for DOM selectors.