109 lines
3.5 KiB
Python
109 lines
3.5 KiB
Python
# transcription_chunked.py
|
||
import whisper
|
||
from pathlib import Path
|
||
import os
|
||
import json
|
||
import ffmpeg
|
||
import tempfile
|
||
|
||
# === Einstellungen ===
|
||
input_file = Path("input/testVideoShort.mov")
|
||
output_dir = Path("transkripte")
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
output_txt = output_dir / f"{input_file.stem}_timed.txt"
|
||
output_json = output_dir / f"{input_file.stem}_segments.json"
|
||
suspect_txt = output_dir / f"{input_file.stem}_suspect_lines.txt"
|
||
|
||
CHUNKS = 4 # Anzahl Chunks (anpassen!)
|
||
OVERLAP = 2.0 # Sekunden Überlappung
|
||
|
||
os.environ["XDG_CACHE_HOME"] = str(Path(__file__).parent / "whisper-cache")
|
||
|
||
probe = ffmpeg.probe(str(input_file))
|
||
duration = float(probe["format"]["duration"])
|
||
print(f"🎥 Videolänge: {duration:.2f} Sekunden")
|
||
|
||
def extract_audio_chunk(start_time, duration, output_path):
|
||
ffmpeg.input(str(input_file), ss=start_time, t=duration).output(
|
||
str(output_path),
|
||
format="wav",
|
||
acodec="pcm_s16le",
|
||
ac=1,
|
||
ar="16000",
|
||
loglevel="error"
|
||
).overwrite_output().run()
|
||
|
||
def is_suspect(text):
|
||
words = text.strip().lower().split()
|
||
if not words:
|
||
return True
|
||
most_common = max([words.count(w) for w in set(words)])
|
||
return most_common / len(words) > 0.6 or most_common > 20
|
||
|
||
tmp_dir = Path(tempfile.mkdtemp())
|
||
all_segments = []
|
||
|
||
print(f"✂️ Teile Audio in {CHUNKS} Chunks ...")
|
||
for i in range(CHUNKS):
|
||
chunk_start = max(0, i * (duration / CHUNKS) - OVERLAP if i > 0 else 0)
|
||
chunk_end = min(duration, (i + 1) * (duration / CHUNKS) + OVERLAP)
|
||
chunk_dur = chunk_end - chunk_start
|
||
chunk_file = tmp_dir / f"chunk_{i}.wav"
|
||
print(f"🔉 Extrahiere Chunk {i+1}/{CHUNKS}: {chunk_start:.2f}s – {chunk_end:.2f}s")
|
||
extract_audio_chunk(chunk_start, chunk_dur, chunk_file)
|
||
|
||
print(f"🧠 Transkribiere Chunk {i+1} ...")
|
||
model = whisper.load_model("small") # Wechsel zu "medium" oder "large" falls gewünscht
|
||
result = model.transcribe(
|
||
str(chunk_file),
|
||
language="de",
|
||
fp16=False,
|
||
word_timestamps=False,
|
||
condition_on_previous_text=True,
|
||
temperature=0,
|
||
verbose=False
|
||
)
|
||
|
||
segments = result["segments"]
|
||
# Zeitversatz für den aktuellen Chunk hinzufügen
|
||
offset = chunk_start
|
||
for seg in segments:
|
||
seg["start"] += offset
|
||
seg["end"] += offset
|
||
all_segments.extend(segments)
|
||
|
||
# === Sortiere und filtere doppelte/überlappende Segmente
|
||
all_segments.sort(key=lambda x: x["start"])
|
||
|
||
def segment_hash(seg):
|
||
return (round(seg["start"], 2), round(seg["end"], 2), seg["text"].strip().lower())
|
||
|
||
unique_segments = []
|
||
seen = set()
|
||
for seg in all_segments:
|
||
h = segment_hash(seg)
|
||
if h not in seen:
|
||
seen.add(h)
|
||
unique_segments.append(seg)
|
||
|
||
print(f"✅ Insgesamt {len(unique_segments)} einzigartige Segmente transkribiert.")
|
||
|
||
with open(output_txt, "w", encoding="utf-8") as f, open(suspect_txt, "w", encoding="utf-8") as f_sus:
|
||
for seg in unique_segments:
|
||
start = seg["start"]
|
||
end = seg["end"]
|
||
text = seg["text"].strip()
|
||
line = f"[{start:.2f} – {end:.2f}] {text}\n"
|
||
f.write(line) # IMMER ins Haupttranskript!
|
||
if is_suspect(text):
|
||
f_sus.write(line)
|
||
|
||
|
||
print(f"📝 Zeitmarkiertes Transkript gespeichert unter: {output_txt}")
|
||
print(f"⚠️ Verdächtige Zeilen gespeichert unter: {suspect_txt}")
|
||
|
||
with open(output_json, "w", encoding="utf-8") as f:
|
||
json.dump(unique_segments, f, ensure_ascii=False, indent=2)
|
||
print(f"💾 Segmentdaten gespeichert unter: {output_json}")
|