# transcription_chunked.py import whisper from pathlib import Path import os import json import ffmpeg import tempfile # === Einstellungen === input_file = Path("input/testVideoShort.mov") output_dir = Path("transkripte") output_dir.mkdir(parents=True, exist_ok=True) output_txt = output_dir / f"{input_file.stem}_timed.txt" output_json = output_dir / f"{input_file.stem}_segments.json" suspect_txt = output_dir / f"{input_file.stem}_suspect_lines.txt" CHUNKS = 4 # Anzahl Chunks (anpassen!) OVERLAP = 2.0 # Sekunden Überlappung os.environ["XDG_CACHE_HOME"] = str(Path(__file__).parent / "whisper-cache") probe = ffmpeg.probe(str(input_file)) duration = float(probe["format"]["duration"]) print(f"🎥 Videolänge: {duration:.2f} Sekunden") def extract_audio_chunk(start_time, duration, output_path): ffmpeg.input(str(input_file), ss=start_time, t=duration).output( str(output_path), format="wav", acodec="pcm_s16le", ac=1, ar="16000", loglevel="error" ).overwrite_output().run() def is_suspect(text): words = text.strip().lower().split() if not words: return True most_common = max([words.count(w) for w in set(words)]) return most_common / len(words) > 0.6 or most_common > 20 tmp_dir = Path(tempfile.mkdtemp()) all_segments = [] print(f"✂️ Teile Audio in {CHUNKS} Chunks ...") for i in range(CHUNKS): chunk_start = max(0, i * (duration / CHUNKS) - OVERLAP if i > 0 else 0) chunk_end = min(duration, (i + 1) * (duration / CHUNKS) + OVERLAP) chunk_dur = chunk_end - chunk_start chunk_file = tmp_dir / f"chunk_{i}.wav" print(f"🔉 Extrahiere Chunk {i+1}/{CHUNKS}: {chunk_start:.2f}s – {chunk_end:.2f}s") extract_audio_chunk(chunk_start, chunk_dur, chunk_file) print(f"🧠 Transkribiere Chunk {i+1} ...") model = whisper.load_model("small") # Wechsel zu "medium" oder "large" falls gewünscht result = model.transcribe( str(chunk_file), language="de", fp16=False, word_timestamps=False, condition_on_previous_text=True, temperature=0, verbose=False ) segments = result["segments"] # Zeitversatz für den aktuellen Chunk hinzufügen offset = chunk_start for seg in segments: seg["start"] += offset seg["end"] += offset all_segments.extend(segments) # === Sortiere und filtere doppelte/überlappende Segmente all_segments.sort(key=lambda x: x["start"]) def segment_hash(seg): return (round(seg["start"], 2), round(seg["end"], 2), seg["text"].strip().lower()) unique_segments = [] seen = set() for seg in all_segments: h = segment_hash(seg) if h not in seen: seen.add(h) unique_segments.append(seg) print(f"✅ Insgesamt {len(unique_segments)} einzigartige Segmente transkribiert.") with open(output_txt, "w", encoding="utf-8") as f, open(suspect_txt, "w", encoding="utf-8") as f_sus: for seg in unique_segments: start = seg["start"] end = seg["end"] text = seg["text"].strip() line = f"[{start:.2f} – {end:.2f}] {text}\n" f.write(line) # IMMER ins Haupttranskript! if is_suspect(text): f_sus.write(line) print(f"📝 Zeitmarkiertes Transkript gespeichert unter: {output_txt}") print(f"⚠️ Verdächtige Zeilen gespeichert unter: {suspect_txt}") with open(output_json, "w", encoding="utf-8") as f: json.dump(unique_segments, f, ensure_ascii=False, indent=2) print(f"💾 Segmentdaten gespeichert unter: {output_json}")