#!/usr/bin/env python3 # murmur.py — Whisper-live (ASR) + NLLB-200 distilled 600M (Translation) # - Two-pane UI (Original + Translation) with dynamic show/hide # - Source language dropdown (Auto → whisper-live auto-detect) # - Transcription via whisper-live (FasterWhisper backend) # - Translation via NLLB-200 distilled 600M (no SeamlessM4T / MMS-LID) # - Square Record button (grey ↔ red) that records input to MP3 (_record.mp3) # - Virtual loopback Gain (dB) before the output sink (e.g., BlackHole) # - English UI; window auto-resizes (no manual resize, no page scrollbar) import atexit import json import locale import os import shutil import subprocess import sys import threading import time import wave import queue from multiprocessing import Process, Manager from typing import Optional import numpy as np import sounddevice as sd import webview import socket import logging # Whisper-live from whisper_live.server import TranscriptionServer from whisper_live.client import TranscriptionClient, TranscriptionTeeClient # noqa: F401 # Translation (NLLB) import torch # noqa: F401 from huggingface_hub import snapshot_download, HfApi # Tame noisy websocket logs caused by readiness probing/forced disconnects logging.getLogger("websockets.server").setLevel(logging.CRITICAL) logging.getLogger("websockets.sync.server").setLevel(logging.CRITICAL) logging.getLogger("websockets.client").setLevel(logging.CRITICAL) logging.getLogger("websocket").setLevel(logging.ERROR) CONFIG_PATH = os.path.join(os.path.dirname(__file__), "config.json") # ----------------------------------------------------------------------------- # Language options / mappings # ----------------------------------------------------------------------------- LANG_TABLE = [ {"id":"eng", "label":"English", "whisper":"en", "nllb":"eng_Latn"}, {"id":"deu", "label":"Deutsch", "whisper":"de", "nllb":"deu_Latn"}, {"id":"spa", "label":"Español", "whisper":"es", "nllb":"spa_Latn"}, {"id":"fra", "label":"Français", "whisper":"fr", "nllb":"fra_Latn"}, {"id":"zho", "label":"中文", "whisper":"zh", "nllb":"zho_Hans"}, {"id":"jpn", "label":"日本語", "whisper":"ja", "nllb":"jpn_Jpan"}, {"id":"por", "label":"Português", "whisper":"pt", "nllb":"por_Latn"}, {"id":"ind", "label":"Bahasa Indonesia", "whisper":"id", "nllb":"ind_Latn"}, {"id":"hin", "label":"हिन्दी", "whisper":"hi", "nllb":"hin_Deva"}, {"id":"arb", "label":"العربية", "whisper":"ar", "nllb":"arb_Arab"}, ] LANG_BY_ID = {x["id"]: x for x in LANG_TABLE} LANG_CHOICES_TGT = [(x["id"], x["label"]) for x in LANG_TABLE] LANG_CHOICES_SRC = [("auto", "Auto")] + LANG_CHOICES_TGT WL_TO_NLLB = {x["whisper"]: x["nllb"] for x in LANG_TABLE} SYS2ID = {"en":"eng","de":"deu","es":"spa","fr":"fra","zh":"zho","ja":"jpn","pt":"por","id":"ind","hi":"hin","ar":"arb"} def detect_system_lang_code(): try: # Avoid deprecated getdefaultlocale: prefer getlocale() loc = (locale.getlocale()[0] or "") if locale.getlocale() else "" if not loc: loc = (locale.getdefaultlocale()[0] or "") # fallback for older Pythons pref = (loc.split("_")[0] or "").lower() return SYS2ID.get(pref, "eng") except Exception: return "eng" # ----------------------------------------------------------------------------- # macOS system output switcher (optional) # ----------------------------------------------------------------------------- class SystemAudioManager: def __init__(self): self.exe = shutil.which("SwitchAudioSource") self.original = None def is_available(self): return bool(self.exe) def _run(self, args): return subprocess.run([self.exe] + args, capture_output=True, text=True, check=False) def get_current_output(self): if not self.is_available(): return None res = self._run(["-t", "output", "-c"]) ; return (res.stdout or "").strip() or None def list_outputs(self): if not self.is_available(): return [] res = self._run(["-a", "-t", "output"]) ; return [ln.strip() for ln in (res.stdout or "").splitlines() if ln.strip()] def set_output(self, name): if not (self.is_available() and name): return False self._run(["-t", "output", "-s", name]) ; return True def maybe_switch_to(self, preferred_name): if not (self.is_available() and preferred_name): return False outs = self.list_outputs() target = None for n in outs: if n == preferred_name: target = n ; break if target is None: low = preferred_name.lower() for n in outs: if n.lower().startswith(low): target = n ; break if target is None: return False cur = self.get_current_output() self.original = cur or self.original if cur != target: self.set_output(target) return True def restore(self): if self.is_available() and self.original and self.get_current_output() != self.original: self.set_output(self.original) # ----------------------------------------------------------------------------- # Audio loopback (monitoring) with virtual gain # ----------------------------------------------------------------------------- class AudioRouter: def __init__(self): sd.default.samplerate = 44100 sd.default.channels = 2 sd.default.latency = 'high' sd.default.blocksize = 512 self.thread = None self.running = False # gain (in dB and linear) self.gain_db = 0.0 self.gain = 1.0 def set_gain_db(self, db): """Set loopback gain in dB (applied to input before sending to output).""" try: db = float(db) except Exception: db = 0.0 db = max(-60.0, min(30.0, db)) # clamp self.gain_db = db self.gain = 10.0 ** (db / 20.0) print(f"[AudioRouter] Gain set to {self.gain_db:.1f} dB (x{self.gain:.2f})", file=sys.stderr) def _cb(self, indata, outdata, frames, t, status): if status: print(f"[Stream-Status] {status}", file=sys.stderr) if self.gain != 1.0: out = indata * self.gain np.clip(out, -1.0, 1.0, out=out) # hard-clip for safety outdata[:] = out else: outdata[:] = indata def _loop(self, inp, outp, channels): try: with sd.Stream(device=(inp, outp), samplerate=sd.default.samplerate, channels=channels, latency=sd.default.latency, blocksize=sd.default.blocksize, callback=self._cb): while self.running: time.sleep(0.1) except Exception as e: print(f"[AudioRouter] {e}", file=sys.stderr) def start(self, inp, outp): devs = sd.query_devices() in_ch = int(devs[inp]['max_input_channels']) out_ch = int(devs[outp]['max_output_channels']) common = max(1, min(in_ch, out_ch)) if common <= 0: print(f"[AudioRouter Error] no common channels (in={in_ch}, out={out_ch})", file=sys.stderr) return self.stop() self.running = True self.thread = threading.Thread(target=self._loop, args=(inp, outp, common), daemon=True) self.thread.start() print(f"[AudioRouter] Loopback: {inp} → {outp} with {common} channel(s)", file=sys.stderr) def stop(self): if self.running: self.running = False self.thread.join(timeout=1.0) print("[AudioRouter] stopped", file=sys.stderr) # ----------------------------------------------------------------------------- # Simple input recorder → WAV (stream) → MP3 via ffmpeg # ----------------------------------------------------------------------------- class InputRecorder: def __init__(self): self._stream = None self._writer_thread = None self._q = queue.Queue(maxsize=64) self._running = False self._wav = None self._wav_path = None self._mp3_path = None self._channels = 1 self._rate = 44100 self._start_ts = None self._input_index = None def is_recording(self): return self._running def _writer_loop(self): try: while self._running or not self._q.empty(): try: chunk = self._q.get(timeout=0.25) except queue.Empty: continue if chunk is None: break self._wav.writeframes(chunk) finally: try: self._wav.close() except Exception: pass def start(self, input_index: int): if self._running: return True devs = sd.query_devices() if input_index is None or input_index < 0 or input_index >= len(devs): print("[Recorder] invalid input device", file=sys.stderr) return False self._input_index = input_index self._channels = max(1, min(2, int(devs[input_index].get("max_input_channels", 1)) )) self._rate = int(sd.default.samplerate or 44100) self._start_ts = time.strftime("%Y-%m-%d_%H-%M-%S") base = os.path.dirname(__file__) # temporary WAV, will convert to MP3 on stop self._wav_path = os.path.join(base, f"{self._start_ts}_record_temp.wav") self._mp3_path = None # open WAV sink self._wav = wave.open(self._wav_path, "wb") self._wav.setnchannels(self._channels) self._wav.setsampwidth(2) # int16 self._wav.setframerate(self._rate) self._running = True def cb(indata, frames, time_info, status): if status: print(f"[Recorder] Status: {status}", file=sys.stderr) pcm16 = np.clip(indata, -1.0, 1.0) pcm16 = (pcm16 * 32767.0).astype(np.int16).tobytes() try: self._q.put_nowait(pcm16) except queue.Full: pass # drop if writer is briefly behind self._writer_thread = threading.Thread(target=self._writer_loop, daemon=True) self._writer_thread.start() self._stream = sd.InputStream( device=input_index, channels=self._channels, samplerate=self._rate, dtype="float32", blocksize=sd.default.blocksize or 512, latency=sd.default.latency or 'high', callback=cb ) self._stream.start() print(f"[Recorder] started (dev #{input_index}, {self._channels}ch @ {self._rate} Hz)", file=sys.stderr) return True def stop_and_save(self): if not self._running: return None self._running = False try: if self._stream: self._stream.stop(); self._stream.close() except Exception: pass try: self._q.put(None) if self._writer_thread: self._writer_thread.join(timeout=2.0) except Exception: pass # Transcode to MP3 via ffmpeg (if present), else keep WAV mp3_name = f"{time.strftime('%Y-%m-%d_%H-%M-%S')}_record.mp3" base = os.path.dirname(__file__) mp3_path = os.path.join(base, mp3_name) ffmpeg = shutil.which("ffmpeg") if ffmpeg: cmd = [ffmpeg, "-y", "-i", self._wav_path, "-vn", "-acodec", "libmp3lame", "-b:a", "192k", mp3_path] try: subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) try: os.remove(self._wav_path) except Exception: pass self._mp3_path = mp3_path print(f"[Recorder] saved: {mp3_path}", file=sys.stderr) return mp3_path except Exception as e: print(f"[Recorder] ffmpeg failed ({e}), keeping WAV", file=sys.stderr) self._mp3_path = None print(f"[Recorder] WAV saved (no ffmpeg): {self._wav_path}", file=sys.stderr) return self._wav_path # ----------------------------------------------------------------------------- # Whisper-live server (daemon) # ----------------------------------------------------------------------------- def _run_wl_server(): srv = TranscriptionServer() srv.run("0.0.0.0", 9090, backend="faster_whisper") def _wait_for_port(host="127.0.0.1", port=9090, timeout=15.0) -> bool: t0 = time.time(); delay = 0.2 while time.time() - t0 < timeout: try: with socket.create_connection((host, port), timeout=0.5): return True except OSError: time.sleep(delay) delay = min(1.0, delay * 1.5) return False # ----------------------------------------------------------------------------- # STT worker: whisper-live client + optional English translation (no NLLB) # ----------------------------------------------------------------------------- def _stt_worker(input_index, queue_to_main, translate_flag, src_lang_id): # Force TranscriptionClient to use the chosen input device _orig_init = TranscriptionClient.__init__ def _patched_init(self, *args, **kwargs): _orig_init(self, *args, **kwargs) try: self.stream.stop_stream(); self.stream.close() except Exception: pass self.stream = self.p.open( format=self.format, channels=self.channels, rate=self.rate, input=True, output=False, frames_per_buffer=self.chunk, input_device_index=input_index ) TranscriptionClient.__init__ = _patched_init wl_lang = None if (src_lang_id == "auto") else LANG_BY_ID.get(src_lang_id, {}).get("whisper") devs = sd.query_devices() print(f"[Whisper-live] listening on input #{input_index} ({devs[input_index]['name']})", file=sys.stderr) last_asr = "" def _cb_asr(text, segments): nonlocal last_asr asr_text = text or "" if asr_text == last_asr: return last_asr = asr_text queue_to_main.put({"asr": asr_text, "trans": None}) def _cb_eng(text, segments): trans_text = text or "" queue_to_main.put({"asr": None, "trans": trans_text}) client_asr = TranscriptionClient( host="localhost", port=9090, lang=wl_lang, translate=False, model="small", use_vad=True, transcription_callback=_cb_asr, max_connection_time=86400*30 ) client_eng = None if translate_flag: client_eng = TranscriptionClient( host="localhost", port=9090, lang=wl_lang, translate=True, # Whisper translate → English model="small", use_vad=True, transcription_callback=_cb_eng, max_connection_time=86400*30 ) th1 = threading.Thread(target=client_asr, daemon=True) th1.start() th2 = None if client_eng is not None: th2 = threading.Thread(target=client_eng, daemon=True) th2.start() # Notify parent: STT clients launched (server reachable / sockets starting) try: queue_to_main.put({"ready": True}) except Exception: pass try: while True: time.sleep(0.25) except KeyboardInterrupt: pass # ----------------------------------------------------------------------------- # Backend API for the GUI # ----------------------------------------------------------------------------- class DeviceAPI: gui_window = None def cleanup(self): try: if hasattr(self, "client_proc") and self.client_proc: self.client_proc.terminate() self.client_proc.join(timeout=1.0) except Exception: pass try: if hasattr(self, "router") and self.router: self.router.stop() except Exception: pass try: if hasattr(self, "recorder") and self.recorder and self.recorder.is_recording(): self.recorder.stop_and_save() except Exception as e: print(f"[Recorder] save-on-exit error: {e}", file=sys.stderr) try: if hasattr(self, "sys_audio") and self.sys_audio: self.sys_audio.restore() print("[SystemAudio] restored original default output", file=sys.stderr) except Exception as e: print(f"[SystemAudio] restore error: {e}", file=sys.stderr) def __init__(self): self.router = AudioRouter() self.recorder = InputRecorder() mgr = Manager() self.queue = mgr.Queue() self.transcribing = False self.client_proc = None self.input_index = None self.output_index = None self.translate_enabled = True self.tgt_lang_id = detect_system_lang_code() self.src_lang_id = "auto" self.models_ready = False self.waiting_first_result = False cfg = self._load_config() self.input_index = cfg.get("input_index") self.output_index = cfg.get("output_index") self.translate_enabled = cfg.get("translate", True) self.tgt_lang_id = cfg.get("tgt_lang", self.tgt_lang_id) self.src_lang_id = cfg.get("src_lang", self.src_lang_id) self.gain_db = float(cfg.get("gain_db", 0.0)) self.sys_audio = SystemAudioManager() try: if isinstance(self.input_index, int): devs = sd.query_devices() if 0 <= self.input_index < len(devs): preferred = devs[self.input_index]["name"] if self.sys_audio.is_available(): self.sys_audio.maybe_switch_to(preferred) except Exception as e: print(f"[SystemAudio] init error: {e}", file=sys.stderr) if isinstance(self.input_index, int) and isinstance(self.output_index, int) and self.output_index != -1: try: self.router.start(self.input_index, self.output_index) self.router.set_gain_db(self.gain_db) except Exception as e: print(f"[AudioRouter init] {e}", file=sys.stderr) else: self.router.set_gain_db(self.gain_db) threading.Thread(target=self._poll, daemon=True).start() def _poll(self): while True: try: payload = self.queue.get(timeout=0.1) except Exception: continue try: # Hide spinner as soon as STT clients report readiness if payload.get("ready"): self.waiting_first_result = False self._overlay("hide") continue # Back-compat: if we were waiting and first text arrives, also hide if self.waiting_first_result and (payload.get("asr") or payload.get("trans")): self.waiting_first_result = False self._overlay("hide") js = f"appendTranscripts({json.dumps(payload.get('asr',''))}, {json.dumps(payload.get('trans',''))});" if DeviceAPI.gui_window: DeviceAPI.gui_window.evaluate_js(js) except Exception as e: print(f"[JS Eval Error] {e}", file=sys.stderr) def app_ready(self): self._overlay("progress", "Initializing…", 1) threading.Thread(target=self._prefetch_models, daemon=True).start() # Toggle visibility & resize native window self._apply_layout() return True def _overlay(self, action: str, message: Optional[str] = None, progress: Optional[float] = None): if not DeviceAPI.gui_window: return msg_js = json.dumps(message) if message is not None else "null" prog_js = ("null" if progress is None else str(int(progress))) js = f"overlayUpdate('{action}', {msg_js}, {prog_js});" try: DeviceAPI.gui_window.evaluate_js(js) except Exception: pass def _overlay_progress_bytes(self, label: str, current: int, total: int): now = time.time() st = getattr(self, "_dl_state", None) if not st or st.get("label") != label: st = self._dl_state = {"label": label, "t0": now, "last_t": now, "last_b": current, "total": total} dt = max(1e-3, now - st["last_t"]) ; db = max(0, current - st["last_b"]) ; speed = db / dt st["last_t"], st["last_b"] = now, current pct = int(100 * current / max(1, total)) avg_speed = max(1e-3, current / max(1e-3, now - st["t0"])) remaining = max(0, total - current) eta_s = int(remaining / avg_speed) def _fmt_bytes(b): for unit in ("B","KB","MB","GB","TB"): if b < 1024 or unit=="TB": return f"{b:.1f} {unit}"; b/=1024 def _fmt_time(s): if s < 60: return f"{s}s" m, s = divmod(s, 60) if m < 60: return f"{m}m {s}s" h, m = divmod(m, 60) ; return f"{h}h {m}m" msg = f"{label} – {pct}% • {_fmt_bytes(current)} / {_fmt_bytes(total)} • {(_fmt_bytes(speed)+'/s') if speed else ''} • ETA {_fmt_time(eta_s)}" self._overlay("progress", msg, pct) def _make_tqdm_class(self, label: str, total_bytes: int): outer = self class OverlayTqdm: def __init__(self, *args, **kwargs): self.total = kwargs.get("total") or 0 def update(self, n=1): st = getattr(outer, "_agg", None) if st is None or st.get("label") != label: outer._agg = st = {"label": label, "cur": 0} st["cur"] += int(n or 0) outer._overlay_progress_bytes(label, min(st["cur"], total_bytes), total_bytes) def close(self): pass def __enter__(self): return self def __exit__(self, exc_type, exc, tb): pass return OverlayTqdm def _download_with_pulse(self, desc: str, start: int, end: int, func): self._overlay("progress", desc, start) done = False err = None ret = None def runner(): nonlocal done, err, ret try: ret = func() except Exception as e: err = e finally: done = True t = threading.Thread(target=runner, daemon=True) t.start() val = start while not done: val = min(end - 1, val + 1) self._overlay("progress", desc, val) time.sleep(0.3) self._overlay("progress", desc + " ✓", end) if err: print(f"[Prefetch] {desc} failed: {err}", file=sys.stderr) return ret def _prefetch_models(self): api = HfApi() fw_total = 0 try: info = api.repo_info(repo_id="Systran/faster-whisper-small", files_metadata=True) if getattr(info, "siblings", None): fw_total = sum(getattr(s, "size", 0) or 0 for s in info.siblings) except Exception as e: print(f"[Prefetch] repo_info FW failed: {e}", file=sys.stderr) used_byte_progress = False label = "Downloading Whisper model (small)" try: if fw_total > 0: self._overlay_progress_bytes(label, 0, fw_total) tqdm_cls = self._make_tqdm_class(label, fw_total) _fw_dir = snapshot_download(repo_id="Systran/faster-whisper-small", allow_patterns=None, tqdm_class=tqdm_cls) self._overlay_progress_bytes(label, fw_total, fw_total) used_byte_progress = True except TypeError: pass except Exception as e: print(f"[Prefetch] FW tqdm download failed: {e}", file=sys.stderr) if not used_byte_progress: def _dl_fw(): return snapshot_download(repo_id="Systran/faster-whisper-small", allow_patterns=None) _fw_dir = self._download_with_pulse(f"{label}…", 10, 95, _dl_fw) self._overlay("progress", "Starting transcription server…", 95) threading.Thread(target=_run_wl_server, daemon=True).start() if _wait_for_port("127.0.0.1", 9090, timeout=20.0): self._overlay("progress", "Server ready", 100) else: self._overlay("progress", "Server pending…", 98) self.models_ready = True self._overlay("hide") # ---- UI API ---- def get_config(self): return { "input_index": self.input_index, "output_index": self.output_index, "translate": self.translate_enabled, "translate_lang": self.tgt_lang_id, "src_lang": self.src_lang_id, "sys_lang": detect_system_lang_code(), "lang_choices": [(x["id"], x["label"]) for x in LANG_TABLE], "src_choices": LANG_CHOICES_SRC, "is_recording": self.recorder.is_recording(), "gain_db": self.gain_db, } def get_input_devices(self): devs = sd.query_devices() seen, out = set(), [] for i, d in enumerate(devs): if d.get("max_input_channels", 0) > 0 and d["name"] not in seen: seen.add(d["name"]) ; out.append({"name": d["name"], "index": i}) return out def get_output_devices(self): devs = sd.query_devices() seen, out = set(), [] for i, d in enumerate(devs): if d.get("max_output_channels", 0) > 0 and d["name"] not in seen: seen.add(d["name"]) ; out.append({"name": d["name"], "index": i}) out.insert(0, {"name": "No output", "index": -1}) return out def set_devices(self, inp, outp): inp, outp = int(inp), int(outp) if self.recorder.is_recording(): self.recorder.stop_and_save() self.input_index, self.output_index = inp, outp if outp != -1: self.router.start(inp, outp) self.router.set_gain_db(self.gain_db) else: self.router.stop() self._persist() if self.transcribing and self.client_proc: self._overlay("show", "Restarting…", None) self.waiting_first_result = True self.client_proc.terminate(); self.client_proc.join(timeout=1.0) time.sleep(0.3) self._start_stt() return True def set_translate(self, v): self.translate_enabled = bool(v) self._persist() if self.transcribing: self._restart_stt() self._apply_layout() # toggle visibility + resize return True def set_translate_lang(self, code): self.tgt_lang_id = str(code or detect_system_lang_code()) self._persist() if self.transcribing: self._restart_stt() return True def set_src_lang(self, code): self.src_lang_id = str(code or "auto") self._persist() if self.transcribing: self._restart_stt() return True def set_gain_db(self, db): try: self.gain_db = float(db) except Exception: self.gain_db = 0.0 self.router.set_gain_db(self.gain_db) self._persist() return True def toggle_transcription(self): if not self.transcribing: if self.input_index is None: return False self._overlay("show", "Starting…", None) self.waiting_first_result = True self._start_stt() self.transcribing = True else: if self.client_proc: self.client_proc.terminate(); self.client_proc.join(timeout=1.0) self._overlay("hide") self.transcribing = False self._apply_layout() # toggle visibility + resize return self.transcribing # Recording control def toggle_recording(self): if not self.recorder.is_recording(): if self.input_index is None: return False ok = self.recorder.start(self.input_index) return bool(ok) else: self.recorder.stop_and_save() return False # now "not recording" def _find_matching_input_for_output(self) -> Optional[int]: """ Versucht, für das aktuell gewählte Output-Device (self.output_index) ein gleichnamiges Input-Device zu finden (typisch bei Loopback-Treibern). Gibt den Input-Device-Index zurück oder None. """ try: if self.output_index is None or self.output_index < 0: return None devs = sd.query_devices() out_name = devs[self.output_index]["name"] # 1) exakter Name for i, d in enumerate(devs): if d.get("max_input_channels", 0) > 0 and d["name"] == out_name: return i low = out_name.lower() # 2) case-insensitive exakter Name for i, d in enumerate(devs): if d.get("max_input_channels", 0) > 0 and d["name"].lower() == low: return i # 3) Prefix-Match (robuster für unterschiedliche Bezeichnungen) for i, d in enumerate(devs): if d.get("max_input_channels", 0) > 0 and d["name"].lower().startswith(low): return i return None except Exception: return None def _start_stt(self): """ Startet den STT-Client-Prozess. Neu: bevorzugt den POST-GAIN Loopback als STT-Quelle, wenn ein passendes Input-Device zum aktuell gewählten Output-Device existiert. Fallback: raw input. """ # Standard: rohes Eingabegerät capture_index = self.input_index # Versuch: passendes Loopback-Input zu aktuellem Output finden loop_idx = self._find_matching_input_for_output() if loop_idx is not None: capture_index = loop_idx try: devs = sd.query_devices() print(f"[ASR] capturing POST-GAIN from loopback input #{loop_idx} ({devs[loop_idx]['name']})", file=sys.stderr) except Exception: print(f"[ASR] capturing POST-GAIN from loopback input #{loop_idx}", file=sys.stderr) else: try: devs = sd.query_devices() print(f"[ASR] capturing RAW from input #{self.input_index} ({devs[self.input_index]['name']})", file=sys.stderr) except Exception: print(f"[ASR] capturing RAW from input #{self.input_index}", file=sys.stderr) self.client_proc = Process( target=_stt_worker, args=(capture_index, self.queue, self.translate_enabled, self.src_lang_id), daemon=True ) self.client_proc.start() def _restart_stt(self): try: self._overlay("show", "Restarting…", None) self.waiting_first_result = True if self.client_proc: self.client_proc.terminate(); self.client_proc.join(timeout=1.0) time.sleep(0.2) self._start_stt() except Exception: pass def _persist(self): data = { "input_index": self.input_index, "output_index": self.output_index, "translate": self.translate_enabled, "tgt_lang": self.tgt_lang_id, "src_lang": self.src_lang_id, "gain_db": self.gain_db, } try: with open(CONFIG_PATH, "w") as f: json.dump(data, f, indent=2) except Exception as e: print(f"[Config Error] {e}", file=sys.stderr) def _load_config(self): if os.path.isfile(CONFIG_PATH): try: return json.load(open(CONFIG_PATH, "r")) except Exception: pass return {} # ---- layout helper: toggle visibility AND resize the native window ---- def _apply_layout(self): show_orig = self.transcribing show_trans = self.transcribing and self.translate_enabled try: if DeviceAPI.gui_window: # Toggle visibility in the DOM js = f"updateLayout({str(show_orig).lower()}, {str(show_trans).lower()});" DeviceAPI.gui_window.evaluate_js(js) # Resize native window (no manual resize; no page scrollbar) # Tuned heights for this layout: # - compact (controls only) : ~333 # - one pane (original) : ~600 # - two panes (orig+trans) : ~860 height = 865 if show_trans else (595 if show_orig else 333) DeviceAPI.gui_window.resize(730, height) except Exception as e: print(f"[Layout] update failed: {e}", file=sys.stderr) # ----------------------------------------------------------------------------- # HTML UI (English, compact device rows, top-aligned buttons, fixed button width) # ----------------------------------------------------------------------------- HTML = """ MurMur - Audio Bridge / Transcribe / Translate
Devices
Input device
Output device
Controls
Gain 0 dB
Loading…
""" # ----------------------------------------------------------------------------- # App bootstrap # ----------------------------------------------------------------------------- def start_gui(): api = DeviceAPI() w = webview.create_window( "MurMur - Audio Bridge / Transcribe / Translate", html=HTML, js_api=api, width=730, height=333, # will be resized programmatically; no manual resize resizable=False ) DeviceAPI.gui_window = w try: w.events.closed += api.cleanup except Exception: pass atexit.register(api.cleanup) webview.start() if __name__ == "__main__": start_gui()