feat: installable package with teampulse entry point

This commit is contained in:
Oliver Hofmann 2026-05-17 14:46:16 +02:00
parent 1cbeb5e41c
commit a492e391ce
3 changed files with 148 additions and 158 deletions

161
main.py
View File

@ -1,166 +1,11 @@
#!/usr/bin/env python3
"""TeamPulse — Teams meeting chat audit tool.
Usage:
.venv/bin/python main.py [--history]
--history Auch bestehende Chat-Nachrichten auswerten (Standard: nur neue ab Scriptstart)
Post !start "Presenter Name" and !stop in the chat to define a time window.
A Markdown memo is saved to the current directory when the window closes.
"""
import argparse
import platform
import re
import subprocess
"""Entry point for ./start.sh — delegates to teampulse.cli."""
import sys
from pathlib import Path
_TEAMS_URL_RE = re.compile(r'https://teams(?:\.live)?\.microsoft\.com/\S+')
sys.path.insert(0, str(Path(__file__).parent / "src"))
from playwright.sync_api import sync_playwright
from teampulse.auth import create_context, ensure_logged_in
from teampulse.memo import generate_memo, save_memo
from teampulse.models import AuditEntry
from teampulse.monitor import Monitor
from teampulse.resolver import Resolver
CACHE_PATH = Path.home() / ".teampulse" / "cache.json"
TEAMS_URL = "https://teams.microsoft.com"
def _read_clipboard() -> str:
try:
system = platform.system()
if system == "Darwin":
return subprocess.check_output(["pbpaste"]).decode("utf-8").strip()
elif system == "Linux":
return subprocess.check_output(["xclip", "-selection", "clipboard", "-o"]).decode("utf-8").strip()
elif system == "Windows":
return subprocess.check_output(
["powershell", "-command", "Get-Clipboard"], text=True
).strip()
except Exception:
pass
return ""
def _extract_teams_url(text: str) -> str | None:
m = _TEAMS_URL_RE.search(text)
return m.group(0) if m else None
def _get_meeting_url() -> str | None:
if len(sys.argv) > 1:
url = _extract_teams_url(sys.argv[1])
if url:
print(f"Meeting-URL aus Argument: {url[:60]}...")
return url
clip = _read_clipboard()
url = _extract_teams_url(clip)
if url:
print(f"Meeting-URL aus Clipboard: {url[:60]}...")
return url
return None
def main():
parser = argparse.ArgumentParser(
prog="teampulse",
description="TeamPulse — Wertet den Chat einer laufenden Teams-Besprechung aus.",
epilog=(
"Ablauf: Browser öffnet sich → zum Meeting-Chat navigieren → "
"!start 'Vortragende/r' im Chat posten → !stop zum Beenden. "
"Das Memo wird automatisch gespeichert und auf den nächsten !start gewartet."
),
)
parser.add_argument(
"--history",
action="store_true",
help="Bestehende Nachrichten auswerten (Standard: nur neue Nachrichten ab Scriptstart)",
)
args = parser.parse_args()
include_history: bool = args.history
print("" * 60)
print(" TeamPulse — Teams Chat Auswertung")
print("" * 60)
if include_history:
print(" Modus: bestehende + neue Nachrichten")
else:
print(" Modus: nur neue Nachrichten ab jetzt")
print(" Trigger: !start Name | !stop")
print(" Beenden: Ctrl+C")
print("" * 60)
meeting_url = _get_meeting_url()
with sync_playwright() as playwright:
print("Starte Browser...")
context = create_context(playwright, headless=False)
ensure_logged_in(context)
page = context.new_page()
# Always load Teams main first so user-name selectors are available
page.goto(TEAMS_URL)
monitor = Monitor(page=page, current_user="")
print("Lese eingeloggten Nutzer...")
current_user = monitor.get_current_user_display_name()
monitor._current_user = current_user
print(f"Eingeloggt als: {current_user}")
if meeting_url:
print("Navigiere zur Meeting-URL...")
page.goto(meeting_url)
try:
page.wait_for_selector(
"[data-tid='channel-pane-message'], [data-tid='chat-pane-message']",
timeout=6000,
)
print("Chat direkt erreicht.")
except Exception:
# Join-Lobby or redirect — navigate back to Teams main.
# User navigates to the meeting chat via the Chat sidebar (no call join needed).
print("Join-Lobby erkannt — bitte im Browser zum Chat navigieren:")
print(" Linke Sidebar → Chat (Icon) → laufendes Meeting anklicken\n")
page.goto(TEAMS_URL)
else:
print("\nBitte im Browser-Fenster zum Meeting-Chat navigieren:")
print(" Linke Sidebar → Chat → laufendes Meeting anklicken\n")
resolver = Resolver(cache_path=CACHE_PATH, page=page)
while True:
window = monitor.run_once(skip_existing=not include_history)
print(f"\nLöse {len(window.entries)} E-Mail-Adresse(n) auf...")
resolved_entries = []
for entry in window.entries:
email = resolver.resolve(entry.display_name)
print(f" {entry.display_name}{email}")
resolved_entries.append(AuditEntry(display_name=entry.display_name, email=email))
window.entries = resolved_entries
memo_content = generate_memo(window)
path = save_memo(memo_content)
print(f"\nMemo gespeichert: {path}\n")
print(memo_content)
print("\n" + "" * 60 + "\n")
context.close()
from teampulse.cli import main
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nAbgebrochen.")
main()

15
pyproject.toml Normal file
View File

@ -0,0 +1,15 @@
[build-system]
requires = ["setuptools>=61"]
build-backend = "setuptools.build_meta"
[project]
name = "teampulse"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = ["playwright>=1.44"]
[project.scripts]
teampulse = "teampulse.cli:main"
[tool.setuptools.packages.find]
where = ["src"]

130
src/teampulse/cli.py Normal file
View File

@ -0,0 +1,130 @@
#!/usr/bin/env python3
"""TeamPulse — Teams meeting chat audit tool."""
import argparse
import platform
import re
import subprocess
import sys
from pathlib import Path
from playwright.sync_api import sync_playwright
from teampulse.auth import create_context, ensure_logged_in
from teampulse.memo import generate_memo, save_memo
from teampulse.models import AuditEntry
from teampulse.monitor import Monitor
from teampulse.resolver import Resolver
CACHE_PATH = Path.home() / ".teampulse" / "cache.json"
TEAMS_URL = "https://teams.microsoft.com"
_TEAMS_URL_RE = re.compile(r'https://teams(?:\.live)?\.microsoft\.com/\S+')
def _read_clipboard() -> str:
try:
system = platform.system()
if system == "Darwin":
return subprocess.check_output(["pbpaste"]).decode("utf-8").strip()
elif system == "Linux":
return subprocess.check_output(["xclip", "-selection", "clipboard", "-o"]).decode("utf-8").strip()
elif system == "Windows":
return subprocess.check_output(
["powershell", "-command", "Get-Clipboard"], text=True
).strip()
except Exception:
pass
return ""
def _get_meeting_url() -> str | None:
clip = _read_clipboard()
m = _TEAMS_URL_RE.search(clip)
if m:
url = m.group(0)
print(f"Meeting-URL aus Clipboard: {url[:60]}...")
return url
return None
def main() -> None:
parser = argparse.ArgumentParser(
prog="teampulse",
description="TeamPulse — Wertet den Chat einer laufenden Teams-Besprechung aus.",
epilog=(
"Ablauf: Browser öffnet sich → zum Meeting-Chat navigieren → "
"!start Vortragende/r im Chat posten → !stop zum Beenden. "
"Das Memo wird automatisch gespeichert und auf den nächsten !start gewartet."
),
)
parser.add_argument(
"--history",
action="store_true",
help="Bestehende Nachrichten auswerten (Standard: nur neue Nachrichten ab Scriptstart)",
)
args = parser.parse_args()
include_history: bool = args.history
print("" * 60)
print(" TeamPulse — Teams Chat Auswertung")
print("" * 60)
print(f" Modus: {'bestehende + neue' if include_history else 'nur neue'} Nachrichten")
print(" Trigger: !start Name | !stop")
print(" Beenden: Ctrl+C")
print("" * 60)
meeting_url = _get_meeting_url()
try:
with sync_playwright() as playwright:
print("Starte Browser...")
context = create_context(playwright, headless=False)
ensure_logged_in(context)
page = context.new_page()
page.goto(TEAMS_URL)
monitor = Monitor(page=page, current_user="")
print("Lese eingeloggten Nutzer...")
current_user = monitor.get_current_user_display_name()
monitor._current_user = current_user
print(f"Eingeloggt als: {current_user}")
if meeting_url:
print("Navigiere zur Meeting-URL...")
page.goto(meeting_url)
try:
page.wait_for_selector(
"[data-tid='channel-pane-message'], [data-tid='chat-pane-message']",
timeout=6000,
)
print("Chat direkt erreicht.")
except Exception:
print("Join-Lobby erkannt — bitte im Browser zum Chat navigieren:")
print(" Linke Sidebar → Chat (Icon) → laufendes Meeting anklicken\n")
page.goto(TEAMS_URL)
else:
print("\nBitte im Browser-Fenster zum Meeting-Chat navigieren:")
print(" Linke Sidebar → Chat → laufendes Meeting anklicken\n")
resolver = Resolver(cache_path=CACHE_PATH, page=page)
while True:
window = monitor.run_once(skip_existing=not include_history)
print(f"\nLöse {len(window.entries)} E-Mail-Adresse(n) auf...")
resolved_entries = []
for entry in window.entries:
email = resolver.resolve(entry.display_name)
print(f" {entry.display_name}{email}")
resolved_entries.append(AuditEntry(display_name=entry.display_name, email=email))
window.entries = resolved_entries
memo_content = generate_memo(window)
path = save_memo(memo_content)
print(f"\nMemo gespeichert: {path}\n")
print(memo_content)
print("\n" + "" * 60 + "\n")
except KeyboardInterrupt:
print("\nAbgebrochen.")