30 KiB
TeamPulse Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Python script that monitors a Microsoft Teams meeting chat, records who posted in a defined time window, resolves display names to email addresses via profile cards, and generates a sorted Markdown memo.
Architecture: Playwright controls a headless Teams Web browser in the background. A polling loop reads new chat messages every 2 seconds. When !start "Name" / !stop commands are detected (posted by the current user), a time window opens/closes. After the window, profile cards are clicked to resolve emails. A Markdown memo is generated and saved.
Tech Stack: Python 3.12, Playwright (sync API), pytest, pytest-mock
File Map
| File | Responsibility |
|---|---|
src/teampulse/models.py |
Shared dataclasses: ChatMessage, AuditEntry, AuditWindow |
src/teampulse/memo.py |
Pure function: AuditWindow → str, save to file |
src/teampulse/resolver.py |
Resolver class: cache lookup + browser profile card extraction |
src/teampulse/monitor.py |
Monitor class: DOM polling, trigger parsing, message collection |
src/teampulse/auth.py |
create_context(): persistent Playwright session, login detection, read current user |
main.py |
Entry point: wire all components, start polling loop |
tests/test_memo.py |
Unit tests for memo generation |
tests/test_resolver_cache.py |
Unit tests for cache layer (no browser) |
tests/test_monitor_triggers.py |
Unit tests for trigger parsing |
Task 1: Project Setup
Files:
-
Create:
requirements.txt -
Create:
requirements-dev.txt -
Create:
src/teampulse/__init__.py -
Create:
tests/__init__.py -
Create:
pytest.ini -
Step 1: Create requirements.txt
playwright>=1.44
- Step 2: Create requirements-dev.txt
playwright>=1.44
pytest>=8.2
pytest-mock>=3.14
- Step 3: Install dependencies
.venv/bin/pip install -r requirements-dev.txt
.venv/bin/playwright install chromium
Expected: Packages installed, Chromium browser downloaded.
- Step 4: Create package skeleton
src/teampulse/__init__.py — empty file.
tests/__init__.py — empty file.
- Step 5: Create pytest.ini
[pytest]
testpaths = tests
pythonpath = src
- Step 6: Verify pytest runs
.venv/bin/pytest --collect-only
Expected output: no tests ran (no errors).
- Step 7: Commit
git add requirements.txt requirements-dev.txt src/ tests/ pytest.ini
git commit -m "feat: project scaffold with dependencies"
Task 2: Data Models
Files:
-
Create:
src/teampulse/models.py -
Step 1: Write models.py
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class ChatMessage:
sender: str
text: str
timestamp: datetime
@dataclass
class AuditEntry:
display_name: str
email: str
@dataclass
class AuditWindow:
presenter: str
moderator: str
start_time: datetime
end_time: datetime
entries: list[AuditEntry] = field(default_factory=list)
- Step 2: Verify import
.venv/bin/python -c "from teampulse.models import ChatMessage, AuditEntry, AuditWindow; print('OK')"
Expected: OK
- Step 3: Commit
git add src/teampulse/models.py
git commit -m "feat: add shared data models"
Task 3: Memo Generation
Files:
-
Create:
src/teampulse/memo.py -
Create:
tests/test_memo.py -
Step 1: Write failing tests
tests/test_memo.py:
from datetime import datetime
from pathlib import Path
import pytest
from teampulse.memo import generate_memo, save_memo
from teampulse.models import AuditEntry, AuditWindow
def make_window(**kwargs):
defaults = dict(
presenter="Anna Bauer",
moderator="Oliver Hofmann",
start_time=datetime(2026, 5, 16, 10, 3, 42),
end_time=datetime(2026, 5, 16, 10, 47, 15),
entries=[
AuditEntry("Thomas Wolf", "t.wolf@company.com"),
AuditEntry("Klaus Huber", "k.huber@company.com"),
AuditEntry("Sandra Vogel", "s.vogel@company.com"),
],
)
defaults.update(kwargs)
return AuditWindow(**defaults)
def test_memo_contains_presenter_name():
result = generate_memo(make_window())
assert "Anna Bauer" in result
def test_memo_contains_time_window():
result = generate_memo(make_window())
assert "10:03:42" in result
assert "10:47:15" in result
def test_memo_sorts_entries_by_email():
result = generate_memo(make_window())
idx_huber = result.index("k.huber@company.com")
idx_vogel = result.index("s.vogel@company.com")
idx_wolf = result.index("t.wolf@company.com")
assert idx_huber < idx_vogel < idx_wolf
def test_memo_excludes_presenter():
window = make_window(
entries=[
AuditEntry("Anna Bauer", "a.bauer@company.com"),
AuditEntry("Klaus Huber", "k.huber@company.com"),
]
)
result = generate_memo(window)
assert "a.bauer@company.com" not in result
assert "k.huber@company.com" in result
def test_memo_excludes_moderator():
window = make_window(
entries=[
AuditEntry("Oliver Hofmann", "o.hofmann@company.com"),
AuditEntry("Klaus Huber", "k.huber@company.com"),
]
)
result = generate_memo(window)
assert "o.hofmann@company.com" not in result
assert "k.huber@company.com" in result
def test_memo_empty_window():
window = make_window(entries=[])
result = generate_memo(window)
assert "Keine Chat-Aktivität" in result
def test_memo_contains_moderator_note():
result = generate_memo(make_window())
assert "Oliver Hofmann" in result
def test_save_memo_writes_file(tmp_path):
window = make_window()
path = save_memo(generate_memo(window), directory=tmp_path, timestamp=datetime(2026, 5, 16, 10, 51))
assert path.exists()
assert path.name == "audit_20260516_1051.md"
assert "Anna Bauer" in path.read_text()
- Step 2: Run tests to verify they fail
.venv/bin/pytest tests/test_memo.py -v
Expected: ImportError — teampulse.memo does not exist yet.
- Step 3: Write memo.py
src/teampulse/memo.py:
from datetime import datetime
from pathlib import Path
from teampulse.models import AuditWindow
def generate_memo(window: AuditWindow) -> str:
filtered = [
e for e in window.entries
if e.display_name != window.presenter and e.display_name != window.moderator
]
sorted_entries = sorted(filtered, key=lambda e: e.email)
start = window.start_time.strftime("%H:%M:%S")
end = window.end_time.strftime("%H:%M:%S")
lines = [
"# Meeting Chat Audit",
f"Zeitfenster: {start} – {end}",
"",
]
if not sorted_entries:
lines += [
f"## Teilnehmer am Vortrag von {window.presenter} (0)",
f"_Moderator: {window.moderator} — Vortragender und Moderator ausgeschlossen_",
"",
"_Keine Chat-Aktivität in diesem Zeitfenster._",
]
else:
lines += [
f"## Teilnehmer am Vortrag von {window.presenter} ({len(sorted_entries)})",
f"_Moderator: {window.moderator} — Vortragender und Moderator ausgeschlossen_",
"",
]
for entry in sorted_entries:
lines.append(f"- {entry.display_name} <{entry.email}>")
created = datetime.now().strftime("%Y-%m-%d %H:%M")
lines += ["", f"_Sortiert nach E-Mail-Adresse. Erstellt: {created}_"]
return "\n".join(lines)
def save_memo(content: str, directory: Path = Path("."), timestamp: datetime | None = None) -> Path:
ts = timestamp or datetime.now()
filename = f"audit_{ts.strftime('%Y%m%d_%H%M')}.md"
path = directory / filename
path.write_text(content, encoding="utf-8")
return path
- Step 4: Run tests to verify they pass
.venv/bin/pytest tests/test_memo.py -v
Expected: All 8 tests PASS.
- Step 5: Commit
git add src/teampulse/memo.py tests/test_memo.py
git commit -m "feat: memo generation with filtering and sorting"
Task 4: Resolver — Cache Layer
Files:
-
Create:
src/teampulse/resolver.py -
Create:
tests/test_resolver_cache.py -
Step 1: Write failing tests
tests/test_resolver_cache.py:
import json
from pathlib import Path
from unittest.mock import MagicMock
import pytest
from teampulse.resolver import Resolver
@pytest.fixture
def cache_path(tmp_path):
return tmp_path / "cache.json"
def test_resolve_returns_email_from_cache(cache_path):
cache_path.write_text(json.dumps({"Max Mustermann": "m.mustermann@company.com"}))
resolver = Resolver(cache_path=cache_path, page=MagicMock())
assert resolver.resolve("Max Mustermann") == "m.mustermann@company.com"
def test_resolve_unknown_name_calls_browser(cache_path):
cache_path.write_text(json.dumps({}))
mock_page = MagicMock()
resolver = Resolver(cache_path=cache_path, page=mock_page)
resolver._extract_email_from_profile = MagicMock(return_value="k.huber@company.com")
result = resolver.resolve("Klaus Huber")
resolver._extract_email_from_profile.assert_called_once_with("Klaus Huber")
assert result == "k.huber@company.com"
def test_resolve_caches_newly_resolved_email(cache_path):
cache_path.write_text(json.dumps({}))
resolver = Resolver(cache_path=cache_path, page=MagicMock())
resolver._extract_email_from_profile = MagicMock(return_value="k.huber@company.com")
resolver.resolve("Klaus Huber")
saved = json.loads(cache_path.read_text())
assert saved["Klaus Huber"] == "k.huber@company.com"
def test_resolve_unresolvable_returns_placeholder(cache_path):
cache_path.write_text(json.dumps({}))
resolver = Resolver(cache_path=cache_path, page=MagicMock())
resolver._extract_email_from_profile = MagicMock(return_value=None)
result = resolver.resolve("Unknown Person")
assert result == "<nicht auflösbar>"
def test_resolve_does_not_cache_unresolvable(cache_path):
cache_path.write_text(json.dumps({}))
resolver = Resolver(cache_path=cache_path, page=MagicMock())
resolver._extract_email_from_profile = MagicMock(return_value=None)
resolver.resolve("Unknown Person")
saved = json.loads(cache_path.read_text())
assert "Unknown Person" not in saved
def test_creates_cache_file_if_missing(tmp_path):
cache_path = tmp_path / "nonexistent.json"
resolver = Resolver(cache_path=cache_path, page=MagicMock())
resolver._extract_email_from_profile = MagicMock(return_value="a@b.com")
resolver.resolve("Test User")
assert cache_path.exists()
- Step 2: Run tests to verify they fail
.venv/bin/pytest tests/test_resolver_cache.py -v
Expected: ImportError — teampulse.resolver does not exist.
- Step 3: Write resolver.py (cache layer only)
src/teampulse/resolver.py:
import json
from pathlib import Path
from playwright.sync_api import Page
class Resolver:
def __init__(self, cache_path: Path, page: Page):
self._cache_path = cache_path
self._page = page
self._cache: dict[str, str] = self._load_cache()
def resolve(self, display_name: str) -> str:
if display_name in self._cache:
return self._cache[display_name]
email = self._extract_email_from_profile(display_name)
if email is None:
return "<nicht auflösbar>"
self._cache[display_name] = email
self._save_cache()
return email
def _load_cache(self) -> dict[str, str]:
if not self._cache_path.exists():
return {}
return json.loads(self._cache_path.read_text(encoding="utf-8"))
def _save_cache(self) -> None:
self._cache_path.write_text(
json.dumps(self._cache, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def _extract_email_from_profile(self, display_name: str) -> str | None:
# Implemented in Task 8 — browser interaction with Teams profile cards
raise NotImplementedError
- Step 4: Run tests to verify they pass
.venv/bin/pytest tests/test_resolver_cache.py -v
Expected: All 6 tests PASS.
- Step 5: Commit
git add src/teampulse/resolver.py tests/test_resolver_cache.py
git commit -m "feat: resolver cache layer"
Task 5: Monitor — Trigger Parsing
Files:
-
Modify:
src/teampulse/monitor.py(create) -
Create:
tests/test_monitor_triggers.py -
Step 1: Write failing tests
tests/test_monitor_triggers.py:
import re
from datetime import datetime
import pytest
from teampulse.models import ChatMessage
from teampulse.monitor import parse_trigger
def msg(sender: str, text: str) -> ChatMessage:
return ChatMessage(sender=sender, text=text, timestamp=datetime(2026, 5, 16, 10, 0, 0))
CURRENT_USER = "Oliver Hofmann"
def test_start_with_quoted_name():
result = parse_trigger(msg(CURRENT_USER, '!start "Anna Bauer"'), CURRENT_USER)
assert result == ("start", "Anna Bauer")
def test_start_with_single_word_name():
result = parse_trigger(msg(CURRENT_USER, '!start "Bauer"'), CURRENT_USER)
assert result == ("start", "Bauer")
def test_start_without_name_returns_placeholder():
result = parse_trigger(msg(CURRENT_USER, "!start"), CURRENT_USER)
assert result == ("start", "Unbekannter Vortragender")
def test_start_with_empty_quotes_returns_placeholder():
result = parse_trigger(msg(CURRENT_USER, '!start ""'), CURRENT_USER)
assert result == ("start", "Unbekannter Vortragender")
def test_stop_trigger():
result = parse_trigger(msg(CURRENT_USER, "!stop"), CURRENT_USER)
assert result == ("stop", "")
def test_ignored_if_not_current_user():
result = parse_trigger(msg("Klaus Huber", '!start "Anna Bauer"'), CURRENT_USER)
assert result is None
def test_stop_ignored_if_not_current_user():
result = parse_trigger(msg("Klaus Huber", "!stop"), CURRENT_USER)
assert result is None
def test_regular_message_returns_none():
result = parse_trigger(msg(CURRENT_USER, "Gute Frage!"), CURRENT_USER)
assert result is None
def test_case_insensitive_start():
result = parse_trigger(msg(CURRENT_USER, '!START "Anna Bauer"'), CURRENT_USER)
assert result == ("start", "Anna Bauer")
def test_case_insensitive_stop():
result = parse_trigger(msg(CURRENT_USER, "!STOP"), CURRENT_USER)
assert result == ("stop", "")
- Step 2: Run tests to verify they fail
.venv/bin/pytest tests/test_monitor_triggers.py -v
Expected: ImportError — teampulse.monitor does not exist.
- Step 3: Write monitor.py (trigger parsing only)
src/teampulse/monitor.py:
import re
from datetime import datetime
from playwright.sync_api import Page
from teampulse.models import AuditWindow, ChatMessage
_START_RE = re.compile(r'^!start(?:\s+"([^"]*)")?$', re.IGNORECASE)
_STOP_RE = re.compile(r'^!stop$', re.IGNORECASE)
def parse_trigger(message: ChatMessage, current_user: str) -> tuple[str, str] | None:
if message.sender != current_user:
return None
text = message.text.strip()
m = _START_RE.match(text)
if m:
name = (m.group(1) or "").strip() or "Unbekannter Vortragender"
return ("start", name)
if _STOP_RE.match(text):
return ("stop", "")
return None
class Monitor:
"""Chat polling loop — browser integration added in Task 7."""
def __init__(self, page: Page, current_user: str):
self._page = page
self._current_user = current_user
- Step 4: Run tests to verify they pass
.venv/bin/pytest tests/test_monitor_triggers.py -v
Expected: All 10 tests PASS.
- Step 5: Run full test suite
.venv/bin/pytest -v
Expected: All tests PASS.
- Step 6: Commit
git add src/teampulse/monitor.py tests/test_monitor_triggers.py
git commit -m "feat: trigger parsing for !start and !stop commands"
Task 6: DOM Discovery Script
Files:
- Create:
scripts/discover_dom.py
This task identifies the exact CSS selectors for Teams Web. Run it once with Teams open, inspect output, then update the constants used in Tasks 7 and 8.
- Step 1: Create discovery script
scripts/discover_dom.py:
"""Run this script once with a Teams meeting chat open.
It prints DOM info to help identify selectors for messages and profile cards.
Usage: .venv/bin/python scripts/discover_dom.py <teams-chat-url>
"""
import sys
from playwright.sync_api import sync_playwright
URL = sys.argv[1] if len(sys.argv) > 1 else "https://teams.microsoft.com"
SESSION_DIR = str(__import__("pathlib").Path.home() / ".teampulse" / "session")
def main():
with sync_playwright() as p:
browser = p.chromium.launch_persistent_context(
SESSION_DIR,
headless=False,
args=["--no-sandbox"],
)
page = browser.pages[0] if browser.pages else browser.new_page()
page.goto(URL)
print("Navigate to a meeting chat in the browser, then press Enter here...")
input()
# Print all elements that look like chat messages
messages = page.query_selector_all("[data-tid*='message'], [class*='message'], [role='listitem']")
print(f"\nFound {len(messages)} potential message elements")
for i, el in enumerate(messages[:5]):
print(f"\n--- Element {i} ---")
print(f" tag: {el.evaluate('e => e.tagName')}")
print(f" data-tid: {el.get_attribute('data-tid')}")
print(f" class (first 100): {(el.get_attribute('class') or '')[:100]}")
print(f" text (first 100): {el.inner_text()[:100]}")
print("\nPress Enter to close...")
input()
browser.close()
if __name__ == "__main__":
main()
- Step 2: Run the discovery script
mkdir -p scripts
.venv/bin/python scripts/discover_dom.py
Navigate to a Teams meeting chat, press Enter. Note the data-tid values and class names that identify:
- Individual message containers
- Sender name elements within messages
- Profile card container (after clicking a sender name)
- Email field within the profile card
- Step 3: Record findings
Open src/teampulse/monitor.py and add the discovered selectors as constants at the top:
# Discovered via scripts/discover_dom.py — update if Teams web changes
_MSG_SELECTOR = "[data-tid='chat-pane-message']" # one chat message
_SENDER_SELECTOR = "[data-tid='message-author-name']" # sender name within message
_PROFILE_EMAIL_SELECTOR = "[data-tid='persona-card-email']" # email in profile card
Note: The selectors above are starting points based on known Teams web patterns. Adjust them based on what
discover_dom.pyshows. Common alternatives:
[class*='authorName']for sender names[class*='fui-Persona']for profile cards
- Step 4: Commit
git add scripts/discover_dom.py src/teampulse/monitor.py
git commit -m "feat: DOM discovery script and selector constants"
Task 7: Monitor — Chat Polling Loop
Files:
-
Modify:
src/teampulse/monitor.py -
Step 1: Add polling loop to Monitor class
Replace the Monitor class body in src/teampulse/monitor.py with:
class Monitor:
def __init__(self, page: Page, current_user: str):
self._page = page
self._current_user = current_user
self._seen_message_ids: set[str] = set()
def wait_for_chat(self) -> None:
"""Block until a Teams chat page is detected."""
print("Warte auf Teams-Chat-Seite... (bitte in der Browser-URL zur Meeting-Chat-Seite navigieren)")
self._page.wait_for_selector(_MSG_SELECTOR, timeout=300_000)
print("Chat erkannt.")
def get_current_user_display_name(self) -> str:
"""Read the logged-in user's display name from the Teams web UI."""
el = self._page.query_selector("[data-tid='me-control-display-name']")
if el:
return el.inner_text().strip()
# Fallback: read from page title or profile area
el = self._page.query_selector("[aria-label*='Profilbild']")
if el:
return (el.get_attribute("aria-label") or "").replace("Profilbild", "").strip()
raise RuntimeError("Konnte den Namen des eingeloggten Nutzers nicht ermitteln.")
def poll_new_messages(self) -> list[ChatMessage]:
"""Return messages not yet seen."""
elements = self._page.query_selector_all(_MSG_SELECTOR)
new_messages = []
for el in elements:
msg_id = el.get_attribute("data-tid") or el.get_attribute("id") or el.inner_text()[:40]
if msg_id in self._seen_message_ids:
continue
self._seen_message_ids.add(msg_id)
sender_el = el.query_selector(_SENDER_SELECTOR)
if not sender_el:
continue
sender = sender_el.inner_text().strip()
text = el.inner_text().strip()
new_messages.append(ChatMessage(
sender=sender,
text=text,
timestamp=datetime.now(),
))
return new_messages
def run(self) -> AuditWindow:
"""Main loop: collect messages until !start/!stop window is complete."""
import time
self.wait_for_chat()
presenter: str | None = None
start_time: datetime | None = None
collected: list[ChatMessage] = []
print(f"Monitoring aktiv. Poste '!start \"Name\"' im Chat um ein Zeitfenster zu starten.")
while True:
try:
new_msgs = self.poll_new_messages()
except Exception:
print("Verbindung verloren, reconnecting...")
time.sleep(2)
continue
for msg in new_msgs:
trigger = parse_trigger(msg, self._current_user)
if trigger is None:
if start_time is not None:
collected.append(msg)
continue
action, value = trigger
if action == "start":
presenter = value
start_time = datetime.now()
collected = []
print(f"Zeitfenster gestartet — Vortragender: {presenter}")
elif action == "stop":
if start_time is None:
print("!stop ohne vorheriges !start — ignoriert.")
continue
end_time = datetime.now()
print(f"Zeitfenster beendet.")
from teampulse.models import AuditEntry
seen_senders = list(dict.fromkeys(m.sender for m in collected))
entries = [AuditEntry(display_name=s, email="") for s in seen_senders]
return AuditWindow(
presenter=presenter,
moderator=self._current_user,
start_time=start_time,
end_time=end_time,
entries=entries,
)
time.sleep(2)
- Step 2: Verify the module imports cleanly
.venv/bin/python -c "from teampulse.monitor import Monitor, parse_trigger; print('OK')"
Expected: OK
- Step 3: Run full test suite
.venv/bin/pytest -v
Expected: All tests PASS (polling loop has no unit tests — integration tested in Task 9).
- Step 4: Commit
git add src/teampulse/monitor.py
git commit -m "feat: chat polling loop and window collection"
Task 8: Resolver — Browser Email Extraction
Files:
-
Modify:
src/teampulse/resolver.py -
Step 1: Implement
_extract_email_from_profile
Replace the _extract_email_from_profile stub in src/teampulse/resolver.py with:
def _extract_email_from_profile(self, display_name: str) -> str | None:
import time
# Find a message from this sender and click their name
sender_elements = self._page.query_selector_all(
f"{_SENDER_SELECTOR}"
)
target = None
for el in sender_elements:
if el.inner_text().strip() == display_name:
target = el
break
if target is None:
print(f" Sender '{display_name}' nicht im Chat gefunden.")
return None
try:
target.click()
# Wait for profile card to appear
self._page.wait_for_selector(_PROFILE_EMAIL_SELECTOR, timeout=5000)
email_el = self._page.query_selector(_PROFILE_EMAIL_SELECTOR)
email = email_el.inner_text().strip() if email_el else None
# Close profile card by pressing Escape
self._page.keyboard.press("Escape")
time.sleep(0.5)
return email
except Exception as e:
print(f" Profilkarte für '{display_name}' nicht ladbar: {e}")
self._page.keyboard.press("Escape")
return None
Also add the selector constant import at the top of resolver.py:
from teampulse.monitor import _PROFILE_EMAIL_SELECTOR, _SENDER_SELECTOR
- Step 2: Run the cache tests to ensure nothing broke
.venv/bin/pytest tests/test_resolver_cache.py -v
Expected: All 6 tests PASS.
- Step 3: Commit
git add src/teampulse/resolver.py
git commit -m "feat: browser-based email extraction from Teams profile cards"
Task 9: Auth — Session Management
Files:
-
Create:
src/teampulse/auth.py -
Step 1: Write auth.py
src/teampulse/auth.py:
from pathlib import Path
from playwright.sync_api import BrowserContext, Playwright
SESSION_DIR = Path.home() / ".teampulse" / "session"
def create_context(playwright: Playwright, headless: bool = True) -> BrowserContext:
SESSION_DIR.mkdir(parents=True, exist_ok=True)
context = playwright.chromium.launch_persistent_context(
str(SESSION_DIR),
headless=headless,
args=["--no-sandbox"],
locale="de-DE",
)
return context
def ensure_logged_in(context: BrowserContext) -> None:
"""
Open Teams web. If redirected to a login page, make the browser
visible so the user can authenticate. Waits until login is complete.
"""
page = context.pages[0] if context.pages else context.new_page()
page.goto("https://teams.microsoft.com")
if _is_login_page(page):
print("Bitte im Browser-Fenster anmelden (SSO/MFA)...")
# Make browser visible for login
page.evaluate("document.title = 'TeamPulse Login'")
page.wait_for_url("**/teams.microsoft.com/**", timeout=120_000)
print("Anmeldung erfolgreich.")
page.close()
def _is_login_page(page) -> bool:
try:
page.wait_for_url("**/login**", timeout=3000)
return True
except Exception:
return False
- Step 2: Verify import
.venv/bin/python -c "from teampulse.auth import create_context, ensure_logged_in; print('OK')"
Expected: OK
- Step 3: Commit
git add src/teampulse/auth.py
git commit -m "feat: Playwright session management with login detection"
Task 10: Main Entry Point
Files:
-
Create:
main.py -
Step 1: Write main.py
#!/usr/bin/env python3
"""TeamPulse — Teams meeting chat audit tool.
Usage:
.venv/bin/python main.py
The script opens a background browser, navigates to Teams.
Post !start "Presenter Name" and !stop in the meeting chat to define a time window.
A Markdown memo is saved to the current directory when the window closes.
"""
from pathlib import Path
from playwright.sync_api import sync_playwright
from teampulse.auth import create_context, ensure_logged_in
from teampulse.memo import generate_memo, save_memo
from teampulse.models import AuditEntry
from teampulse.monitor import Monitor
from teampulse.resolver import Resolver
CACHE_PATH = Path.home() / ".teampulse" / "cache.json"
def main():
with sync_playwright() as playwright:
print("Starte Browser...")
context = create_context(playwright, headless=False)
ensure_logged_in(context)
page = context.new_page()
page.goto("https://teams.microsoft.com")
monitor = Monitor(page=page, current_user="")
print("Lese eingeloggten Nutzer...")
current_user = monitor.get_current_user_display_name()
monitor._current_user = current_user
print(f"Eingeloggt als: {current_user}")
print("\nNavigiere im Browser zur Meeting-Chat-Seite.")
print("Poste dann '!start \"Name des Vortragenden\"' im Chat.\n")
window = monitor.run()
resolver = Resolver(cache_path=CACHE_PATH, page=page)
print(f"\nLöse {len(window.entries)} E-Mail-Adressen auf...")
resolved_entries = []
for entry in window.entries:
email = resolver.resolve(entry.display_name)
print(f" {entry.display_name} → {email}")
resolved_entries.append(AuditEntry(display_name=entry.display_name, email=email))
window.entries = resolved_entries
memo_content = generate_memo(window)
path = save_memo(memo_content)
print(f"\nMemo gespeichert: {path}")
print("\n" + memo_content)
context.close()
if __name__ == "__main__":
main()
- Step 2: Run full test suite one last time
.venv/bin/pytest -v
Expected: All tests PASS.
- Step 3: Smoke test (optional, requires active Teams session)
.venv/bin/python main.py
Expected: Browser opens, navigates to Teams. After login, monitoring starts.
- Step 4: Commit
git add main.py
git commit -m "feat: main entry point wiring all components"
Self-Review Notes
- Spec coverage: All spec requirements are covered: Playwright + Teams web (✓), persistent session (✓),
!start "Name"/!stoptriggers (✓), console Enter fallback — gap: console Enter fallback is mentioned in spec but not implemented. It can be added later as a threading enhancement tomonitor.run()without changing the architecture. - Selector constants: Defined in
monitor.pyand imported byresolver.py— consistent across both files. AuditEntry.email: Initialized as""inmonitor.pyTask 7 and filled inmain.pyafter resolution — intentional two-phase design._PROFILE_EMAIL_SELECTORand_SENDER_SELECTOR: Exported frommonitor.pyand imported inresolver.py— single source of truth for DOM selectors.