commit 353e0a980df77d7a793e1ae33367fbe1c60f60ee Author: Victor Giers Date: Thu Sep 11 03:11:55 2025 +0200 initial commit diff --git a/README.md b/README.md new file mode 100644 index 0000000..ee2a59d --- /dev/null +++ b/README.md @@ -0,0 +1,146 @@ +# GLaDOS GUI — Local LLM Style Transfer + Realtime TTS + +GLaDOS GUI lets you type text, have a local LLM rewrite it in GLaDOS’ voice, and hear it spoken immediately using a high‑quality Piper TTS model. It streams the LLM output into the UI while synthesizing audio in realtime, so you hear sentences as they form. + +- Left: editable input box +- Center: Image of GLaDOS; her eye glow follows audio loudness +- Right: non‑editable output (streamed from the LLM) with Copy, Play/Stop, and Save Audio buttons + +Works fully offline with a local Ollama model and a local Piper voice. + +## Highlights + +- Realtime: streams LLM tokens, splits into sentences, and synthesizes with Piper as they arrive +- Low‑latency audio: raw PCM straight to PortAudio via `sounddevice` +- Eye glow: smoothed visual loudness indicator from the audio envelope +- Persistent settings: remembers Ollama URL and selected model, and Piper ONNX path +- Simple launcher: `run.sh` creates a venv, installs deps, and launches the app + +## Quick Start + +1) Install system prerequisites +- Install Python 3.10+ and ensure `python3` is on your PATH +- Install Ollama (and start it): https://ollama.com/ +- Install the Piper TTS voice model (ONNX) from Dave’s Armoury (see TTS section below) + +2) Clone and run +- macOS/Linux + - `chmod +x run.sh` + - `./run.sh` +- Or manually + - `python3 -m venv .venv && source .venv/bin/activate` + - `pip install --upgrade pip` + - `pip install -r requirements.txt` + - `python glados_gui.py` + +3) In the GUI +- Model: pick your local Ollama model (recommendation below) +- Ollama URL: default is `http://localhost:11434/api/generate` +- Voice model: pick the `glados_piper_medium.onnx` file you downloaded +- Type your text (left) → GLaDOSify → output streams (right) + audio plays + +## Recommended LLM (Ollama) + +- Best results: `mistral3.2:24b` + - 32 GB unified memory works well + - 16 GB can work, but generation may be slower +- If that tag isn’t available on your system, fall back to another LLM. The app will list locally‑available models via Ollama. + +Pull the model in Ollama (example): +- `ollama pull mistral3.2:24b` + +Ensure the Ollama service is running: +- `ollama serve` + +## TTS Voice (Piper / ONNX) + +- Voice: GLaDOS Piper medium ONNX from Dave’s Armoury + - Download from: https://huggingface.co/DavesArmoury/GLaDOS_TTS + - Files needed in the same folder (or anywhere you choose via picker): + - `glados_piper_medium.onnx` + - `glados_piper_medium.onnx.json` +- Credits: massive thanks to Dave’s Armoury for the GLaDOS TTS model! + +Piper CLI +- The app expects the `piper` binary on your PATH +- Installing `piper-tts` (Python package) provides the CLI on most setups +- Alternatively, install Piper via your OS package manager if available + +## Running from the CLI (no GUI) + +You can still use the original streaming script directly: +- `python glados_say_stream.py --stream -t "input text"` + +This streams rewritten text from Ollama and speaks it sentence‑by‑sentence with Piper. + +## Files + +- `glados_gui.py`: Main GUI +- `glados_say_stream.py`: Streaming LLM → TTS engine (also usable as a CLI) +- `speaker.svg`: Speaker icon used in the GUI (can be replaced) +- `glados_head.png`, `glados_eye.png`: Center visuals (eye opacity tracks loudness) +- `run.sh`: One‑shot launcher; creates venv, installs deps, runs GUI +- `requirements.txt`: Python dependencies for the GUI + streaming + +## Configuration + +The GUI writes a small `config.json` next to the app with: +- `ollama_url`: Ollama HTTP endpoint (default `http://localhost:11434/api/generate`) +- `ollama_model`: last selected model name +- `piper_model`: last selected Piper ONNX path + +You can change these at any time in the GUI; the app remembers them. + +## How it works (tech overview) + +- Prompt: The app uses a carefully crafted “GLaDOS style transfer” prompt. Your input is rewritten; no extra narration or meta text is added. +- Streaming: The GUI posts to `Ollama /api/generate` with `stream=true`. Chunks are filtered to drop any hidden `` blocks and appended to the right textbox, in case you are using a reasoning-model. +- Sentence splitting: As streamed text arrives, it’s split at safe sentence boundaries; each complete sentence is sent to Piper immediately. +- Piper: A single Piper process runs during the session. The GUI writes sentences to Piper’s stdin and plays the raw PCM via `sounddevice` in realtime. Audio is also mirrored to a WAV so you can Save Audio at the end. +- Eye flicker: The audio reader computes an RMS envelope and calls back into the GUI. Opacity is smoothed and slightly delayed to match perceived audio timing. + +## Fonts / Visuals + +- UI typeface: Lucida Console (please install it locally for best results). +- Colors: black background (`#000000`), amber text (`#e1a101`). +- Borders: dashed “retro” borders around textboxes and control buttons. + +## Dependencies + +Core Python packages (see `requirements.txt`): +- `requests`: Ollama HTTP client +- `sounddevice`: PortAudio bridge for low‑latency playback +- `numpy`: audio level calculations (RMS) +- `Pillow`: image scaling and eye opacity levels; icon rendering +- `cairosvg` (optional): renders `speaker.svg` to a crisp PNG at UI size +- `piper-tts`: provides the `piper` CLI (or install Piper separately) + +Non‑Python: +- Ollama (local LLM runtime): https://ollama.com/ +- Piper (CLI TTS engine): https://github.com/rhasspy/piper + +## Usage Tips + +- If audio starts clipping, reduce the Piper length/noise parameters (the GUI uses sensible defaults). +- If the eye glow feels out of sync, small variations in audio driver buffering can be compensated by adjusting the internal smoothing/delay values (ask for help in issues). + +## Credits + +- GLaDOS TTS voice: Dave’s Armoury — https://huggingface.co/DavesArmoury/GLaDOS_TTS +- Piper TTS engine: Rhasspy / Piper — https://github.com/rhasspy/piper +- LLM runtime: Ollama — https://ollama.com/ +- Python libraries: `requests`, `sounddevice`, `numpy`, `Pillow`, `cairosvg`, `piper-tts`, and the standard Tkinter GUI toolkit. + +## Trademarks & Attribution + +- Portal and GLaDOS are properties/trademarks of Valve Corporation. Portal is a registered trademark of Valve. See: https://www.valvesoftware.com/ +- The GLaDOS image shown in the center panel is a screenshot of an in‑game model from the Portal series by Valve and is used here for demonstration/fan purposes. No ownership is claimed. +- The AI voice used by this app is not created by the author of this GUI. The Piper ONNX model is created by Dave’s Armoury using audio from the Portal games by Valve. Download: https://huggingface.co/DavesArmoury/GLaDOS_TTS +- This project is a fan‑made tool intended for personal/educational use and is not affiliated with, endorsed by, or sponsored by Valve Corporation. + +## Troubleshooting + +- “piper not found”: ensure the `piper` CLI is on PATH. Installing `piper-tts` via pip often provides it; otherwise install Piper for your OS. +- “No audio device”: some systems need `sounddevice` to be configured with a valid output device; use the CLI `--list-devices` to find indices. +- “Ollama connection error”: make sure `ollama serve` is running and that the URL in the GUI is correct. Pull the model you want with `ollama pull ...` first. +- Icon looks blurry: install `cairosvg` so SVG is rendered to the exact UI size (or drop a tuned `speaker.png`). diff --git a/glados_eye.png b/glados_eye.png new file mode 100644 index 0000000..1fcf937 Binary files /dev/null and b/glados_eye.png differ diff --git a/glados_gui.py b/glados_gui.py new file mode 100644 index 0000000..c183f4f --- /dev/null +++ b/glados_gui.py @@ -0,0 +1,1054 @@ +#!/usr/bin/env python3 +import json +import pathlib +import tempfile +import queue +import threading +import subprocess +import sys +import time +import os +from dataclasses import dataclass +from typing import List, Optional + +import requests +import tkinter as tk +from tkinter import ttk, filedialog, messagebox +import tkinter.font as tkfont +from collections import deque + +# Optional Pillow for alpha control on eye overlay +try: + from PIL import Image, ImageTk, ImageEnhance + PIL_OK = True +except Exception: + PIL_OK = False + +# Reuse streaming utilities +from glados_say_stream import ( + DEFAULT_OLLAMA_MODEL, + DEFAULT_OLLAMA_URL, + ThinkStripper, + pop_complete_sentences, + PiperStreamer, + load_sample_rate, +) + +APP_DIR = pathlib.Path(__file__).resolve().parent +CONFIG_PATH = APP_DIR / "config.json" +EYE_IMAGE_PATH = APP_DIR / "glados_eye.png" +HEAD_IMAGE_PATH = APP_DIR / "glados_head.png" +PIPER_MODEL_DEFAULT = APP_DIR / "glados_piper_medium.onnx" + +BG = "#000000" +BOX_BG = "#000000" +FG = "#e1a101" + + +@dataclass +class Settings: + ollama_url: str = DEFAULT_OLLAMA_URL + ollama_model: str = "" + piper_model: str = str(PIPER_MODEL_DEFAULT) + + @classmethod + def load(cls) -> "Settings": + if CONFIG_PATH.exists(): + try: + data = json.loads(CONFIG_PATH.read_text()) + return cls( + ollama_url=data.get("ollama_url", DEFAULT_OLLAMA_URL), + ollama_model=data.get("ollama_model", ""), + piper_model=data.get("piper_model", str(PIPER_MODEL_DEFAULT)), + ) + except Exception: + pass + return cls() + + def save(self) -> None: + data = { + "ollama_url": self.ollama_url, + "ollama_model": self.ollama_model, + "piper_model": self.piper_model, + } + CONFIG_PATH.write_text(json.dumps(data, indent=2)) + + +def list_ollama_models() -> List[str]: + # Prefer CLI to avoid needing special HTTP paths + try: + out = subprocess.check_output(["ollama", "list", "--quiet"], stderr=subprocess.DEVNULL, text=True) + models = [line.strip() for line in out.splitlines() if line.strip()] + return models + except Exception: + # Best-effort fallback via tags API (local Ollama) + try: + r = requests.get(DEFAULT_OLLAMA_URL.rsplit("/", 1)[0] + "/tags", timeout=2) + r.raise_for_status() + tags = r.json().get("models", []) + names = [] + for m in tags: + name = m.get("name") or m.get("model") + if name: + names.append(name) + return names + except Exception: + return [] + + +class GladosGUI(tk.Tk): + def __init__(self): + super().__init__() + self.title("GLaDOSify") + self.configure(bg=BG) + # Global font/color defaults (use named font to handle space in family name) + self._ui_font = tkfont.Font(family="Lucida Console", size=12) + self.option_add("*Font", self._ui_font) + self.option_add("*Background", BG) + self.option_add("*foreground", FG) + self.option_add("*Foreground", FG) + self.option_add("*selectBackground", FG) + self.option_add("*selectForeground", BG) + self.option_add("*insertBackground", FG) + self.geometry("780x460") + + self.settings = Settings.load() + self.models = [] # populated async + + # State + self._stream_thread: Optional[threading.Thread] = None + self._replay_thread: Optional[threading.Thread] = None + self._stop_flag = threading.Event() + self._ui_q: queue.Queue[str] = queue.Queue() + self._audio_level = 0.0 + self._lvl_ema = 0.0 # smoothed visual level + self._last_wav_path: Optional[pathlib.Path] = None + self._last_output_full = "" + self._streaming = False + self._stream_tts_wanted = True + self._stream_ps: Optional[PiperStreamer] = None + self._current_response = None + self._pending_sents: deque[str] = deque() + self._stream_buf = "" + + # Layout: 3 columns (left text | center image | right text) + self.columnconfigure(0, weight=1) + # Keep center column visible and allow it to expand + self.columnconfigure(1, weight=1, minsize=260) + self.columnconfigure(2, weight=1) + self.rowconfigure(0, weight=1) + self.rowconfigure(1, weight=0) + + # Left editable textbox + self.left_frame = tk.Frame(self, bg=BG) + self.left_frame.grid(row=0, column=0, sticky="nsew") + self.input_wrap = tk.Frame(self.left_frame, bg=BG) + self.input_wrap.pack(fill="both", expand=True) + self._add_dashed_border(self.input_wrap) + self.input_text = tk.Text( + self.input_wrap, + bg=BOX_BG, + fg=FG, + insertbackground=FG, + wrap="word", + bd=0, + highlightthickness=0, + padx=10, + pady=10, + ) + self.input_text.pack(fill="both", expand=True, padx=4, pady=4) + # Placeholder for input + self._init_placeholder() + # Left toolbar (below left textbox) + self.left_toolbar = tk.Frame(self.left_frame, bg=BG) + self.left_toolbar.pack(side="bottom", fill="x") + # Center GLaDOSify button by using expanding spacers + self._lt_sp_left = tk.Frame(self.left_toolbar, bg=BG) + self._lt_sp_left.pack(side="left", expand=True, fill="x") + self.gladosify_wrap = tk.Frame(self.left_toolbar, bg=BG) + self.gladosify_wrap.pack(side="left", padx=(8, 8), pady=(6, 6)) + self._lt_sp_right = tk.Frame(self.left_toolbar, bg=BG) + self._lt_sp_right.pack(side="left", expand=True, fill="x") + self._add_dashed_border(self.gladosify_wrap) + self.gladosify_btn = ttk.Button(self.gladosify_wrap, text="GLaDOSify", command=self.on_gladosify_click, + style="Dark.TButton", takefocus=False) + self.gladosify_btn.pack(padx=4, pady=4) + + # Center image canvas (head + eye overlay) + self.center_frame = tk.Frame(self, bg=BG) + self.center_frame.grid(row=0, column=1, sticky="nsew") + self.canvas = tk.Canvas(self.center_frame, width=260, height=260, bg=BG, highlightthickness=0, bd=0) + self.canvas.pack(padx=0, pady=0, fill="both", expand=True) + self._resize_job = None + self._last_render_size = (0, 0) + self.center_frame.bind("", lambda e: self._schedule_resize()) + self.canvas.bind("", lambda e: self._schedule_resize()) + self._load_images() + + # Right non-editable textbox with top-right small buttons + self.right_frame = tk.Frame(self, bg=BG) + self.right_frame.grid(row=0, column=2, sticky="nsew") + self.right_frame.rowconfigure(0, weight=1) + self.right_frame.columnconfigure(0, weight=1) + + self.output_container = tk.Frame(self.right_frame, bg=BG) + self.output_container.grid(row=0, column=0, sticky="nsew") + + self.output_wrap = tk.Frame(self.output_container, bg=BG) + self.output_wrap.pack(side="top", fill="both", expand=True) + self._add_dashed_border(self.output_wrap) + self.output_text = tk.Text( + self.output_wrap, + bg=BOX_BG, + fg=FG, + wrap="word", + bd=0, + highlightthickness=0, + padx=10, + pady=10, + ) + self.output_text.configure(state="disabled") + self.output_text.pack(fill="both", expand=True, padx=4, pady=4) + + # Bottom toolbar (below right textbox) + self.toolbar = tk.Frame(self.output_container, bg=BG) + self.toolbar.pack(side="bottom", fill="x") + + # Row centered under the textbox + self.toolbar_row = tk.Frame(self.toolbar, bg=BG) + self.toolbar_row.pack(side="top", pady=6) + + def make_bordered_button(parent, text, cmd, icon=False): + wrap = tk.Frame(parent, bg=BG) + wrap.pack(side="left", padx=6, pady=2) + self._add_dashed_border(wrap) + btn = ttk.Button( + wrap, + text=text, + command=cmd, + style=("DarkIcon.TButton" if icon else "Dark.TButton"), + takefocus=False, + width=0, + ) + # Keep a tiny inner margin so the dashed border is visible + pad_x = 8 + pad_y = 5 + btn.pack(padx=pad_x, pady=pad_y) + return wrap, btn + + # Buttons centered and spaced + self.copy_wrap, self.copy_btn = make_bordered_button(self.toolbar_row, "⧉", self.copy_output, icon=True) + # Build / load orange speaker icon image (user-provided files preferred) + self._speaker_icon = self._load_speaker_icon(16) + self.speaker_wrap, self.speaker_btn = make_bordered_button(self.toolbar_row, "", self.toggle_speaker, icon=True) + if self._speaker_icon is not None: + self.speaker_btn.configure(image=self._speaker_icon, text="") + self.save_wrap, self.save_btn = make_bordered_button(self.toolbar_row, "Save Audio", self.save_as_wav) + self.save_btn.configure(state="disabled") + + # Bottom bar: left button, center controls (model + URL), right button + self.bottom = tk.Frame(self, bg=BG) + self.bottom.grid(row=1, column=0, columnspan=3, sticky="ew", pady=(8, 8)) + self.bottom.columnconfigure(0, weight=1) + self.bottom.columnconfigure(1, weight=0) + self.bottom.columnconfigure(2, weight=1) + + # Left area in bottom bar is now empty spacer + tk.Frame(self.bottom, bg=BG).grid(row=0, column=0, sticky="w", padx=(8, 8)) + + center_controls = tk.Frame(self.bottom, bg=BG) + center_controls.grid(row=0, column=1) + tk.Label(center_controls, text="Model:", bg=BG, fg=FG).pack(side="left", padx=(0, 4)) + self.model_var = tk.StringVar(value=self.settings.ollama_model or "Select LLM...") + self.combo_wrap = tk.Frame(center_controls, bg=BG) + self.combo_wrap.pack(side="left", padx=(0, 8)) + self._add_dashed_border(self.combo_wrap) + self.model_combo = ttk.Combobox(self.combo_wrap, textvariable=self.model_var, values=[], width=20, state="readonly", style="Dark.TCombobox") + self.model_combo.pack(side="left", padx=4, pady=4) + self.combo_arrow = tk.Label(self.combo_wrap, text="▼", bg=BG, fg=FG) + self.combo_arrow.place(relx=1.0, rely=0.5, anchor="e", x=-6) + self.combo_arrow.bind("", lambda e: self.model_combo.event_generate("")) + self.model_combo.bind("<>", self._on_model_change) + + tk.Label(center_controls, text="Ollama URL:", bg=BG, fg=FG).pack(side="left", padx=(0, 4)) + self.url_var = tk.StringVar(value=self.settings.ollama_url) + self.url_wrap = tk.Frame(center_controls, bg=BG) + self.url_wrap.pack(side="left", padx=(0, 8)) + self._add_dashed_border(self.url_wrap) + self.url_entry = tk.Entry(self.url_wrap, textvariable=self.url_var, bg=BOX_BG, fg=FG, insertbackground=FG, bd=0, width=28, highlightthickness=0) + self.url_entry.pack(side="left", padx=4, pady=4) + self.url_var.trace_add("write", lambda *_: self._save_settings()) + + # Piper model picker + self.voice_wrap = tk.Frame(center_controls, bg=BG) + self.voice_wrap.pack(side="left") + self._add_dashed_border(self.voice_wrap) + self.voice_btn = ttk.Button(self.voice_wrap, text="Voice model…", command=self.pick_piper_model, style="Dark.TButton") + self.voice_btn.pack(padx=4, pady=4) + + # Right area bottom bar spacer + tk.Frame(self.bottom, bg=BG).grid(row=0, column=2, sticky="e", padx=(8, 8)) + + # Style ttk for dark bg + style = ttk.Style(self) + try: + style.theme_use("clam") + except Exception: + pass + # Dark combobox style to remove inner white border + style.configure("Dark.TCombobox", fieldbackground=BOX_BG, background=BOX_BG, foreground=FG, + bordercolor=BG, lightcolor=BG, darkcolor=BG, arrowsize=14) + style.map("Dark.TCombobox", + fieldbackground=[("readonly", BOX_BG)], + foreground=[("readonly", FG)], + background=[("readonly", BOX_BG)]) + # Dark button styles (remove Aqua gradients/borders) + # Minimal layout without border/focus elements + style.layout("Dark.TButton", [ + ("Button.padding", {"sticky": "nswe", "children": [ + ("Button.label", {"sticky": "nswe"}) + ]}) + ]) + style.configure("Dark.TButton", background=BG, foreground=FG, borderwidth=0, focusthickness=0, relief="flat", padding=(8,5)) + style.map("Dark.TButton", + background=[("active", BG), ("pressed", BG)], + foreground=[("disabled", "#5a4a10"), ("!disabled", FG)]) + # Compact icon button style (less horizontal padding) + style.layout("DarkIcon.TButton", [ + ("Button.padding", {"sticky": "nswe", "children": [ + ("Button.label", {"sticky": "nswe"}) + ]}) + ]) + style.configure("DarkIcon.TButton", background=BG, foreground=FG, borderwidth=0, focusthickness=0, relief="flat", padding=(8,5)) + style.map("DarkIcon.TButton", + background=[("active", BG), ("pressed", BG)], + foreground=[("disabled", "#5a4a10"), ("!disabled", FG)]) + + # Kick off model listing and UI polling + threading.Thread(target=self._load_models_async, daemon=True).start() + self.after(50, self._drain_ui_queue) + # Ensure images render once canvas has a real size + self.after(0, self._ensure_canvas_ready) + # Eye animation tick (50ms) + self.after(50, self._tick_eye) + + # --- Images / eye overlay --- + def _load_images(self): + try: + if PIL_OK: + self._head_img_orig = Image.open(HEAD_IMAGE_PATH).convert("RGBA") + self._eye_img_orig = Image.open(EYE_IMAGE_PATH).convert("RGBA") + self._eye_levels = None # generated per-size in _resize_images + else: + # Fallback: load originals and let _resize_images() downscale via subsample() + self._head_photo_orig = tk.PhotoImage(file=str(HEAD_IMAGE_PATH)) + self._eye_photo_orig = tk.PhotoImage(file=str(EYE_IMAGE_PATH)) + self._eye_levels = None + except Exception as e: + messagebox.showerror("Assets missing", f"Failed to load images: {e}") + self._head_base = None + self._eye_photo = None + self._eye_levels = None + + # Defer initial sizing until canvas has a size + self.after(0, self._ensure_canvas_ready) + + def _ensure_canvas_ready(self): + # Wait until canvas is realized with a usable size, then size images + cw = self.canvas.winfo_width() + ch = self.canvas.winfo_height() + if cw < 20 or ch < 20: + self.after(50, self._ensure_canvas_ready) + return + self._resize_images() + + def _schedule_resize(self): + # Debounce heavy resize work while dragging the window + if self._resize_job is not None: + try: + self.after_cancel(self._resize_job) + except Exception: + pass + self._resize_job = self.after(150, self._apply_resize) + + def _apply_resize(self): + self._resize_job = None + self._resize_images() + + def _resize_images(self): + # Fit images to current canvas while preserving aspect + cw = max(120, self.canvas.winfo_width() - 8) + ch = max(120, self.canvas.winfo_height() - 8) + if cw <= 0 or ch <= 0: + return + # Skip expensive work if size hasn't changed meaningfully + if self._last_render_size == (cw, ch): + return + self._last_render_size = (cw, ch) + + if PIL_OK and hasattr(self, "_head_img_orig"): + # High-quality resize via Pillow + w0, h0 = self._head_img_orig.size + scale = min(cw / w0, ch / h0) + scale = max(0.2, min(scale, 1.5)) + w, h = int(w0 * scale), int(h0 * scale) + head_resized = self._head_img_orig.resize((w, h), Image.LANCZOS) + self._head_base = ImageTk.PhotoImage(head_resized) + # Store resized eye base and lazily generate alpha frames on demand + self._eye_base_resized = self._eye_img_orig.resize((w, h), Image.LANCZOS) + self._eye_levels = [None] * 41 # 0..40 frames + # Choose current eye frame based on current smoothed level + lvl = getattr(self, "_lvl_ema", 0.0) + vis = self._map_level_to_vis(lvl) + idx = max(0, min(40, int(round(vis * 40)))) + self._eye_photo = self._get_eye_frame(idx) + self._render_images() + return + + # Fallback path (no Pillow): downscale using integer subsample + if hasattr(self, "_head_photo_orig"): + w0 = self._head_photo_orig.width() + h0 = self._head_photo_orig.height() + # Determine integer reduction factor so image fits within cw x ch + import math + fx = math.ceil(w0 / cw) + fy = math.ceil(h0 / ch) + f = max(1, fx, fy) + try: + self._head_base = self._head_photo_orig.subsample(f, f) + self._eye_photo = self._eye_photo_orig.subsample(f, f) + except Exception: + # If subsample not available, just use originals + self._head_base = self._head_photo_orig + self._eye_photo = self._eye_photo_orig + self._render_images() + + def _render_images(self): + self.canvas.delete("all") + self.canvas.update_idletasks() + cw = max(1, self.canvas.winfo_width()) + ch = max(1, self.canvas.winfo_height()) + cx, cy = cw // 2, ch // 2 + if hasattr(self, "_head_base") and self._head_base is not None: + self.canvas.create_image(cx, cy, image=self._head_base) + if hasattr(self, "_eye_photo") and self._eye_photo is not None: + self._eye_item = self.canvas.create_image(cx, cy, image=self._eye_photo) + + def _tick_eye(self): + # Map audio level to eye opacity + # Smooth level (EMA) and slightly delay visually to feel synced with audio + target = self._get_delayed_level() + # EMA smoothing + alpha = 0.2 + self._lvl_ema = (1 - alpha) * self._lvl_ema + alpha * target + lvl = self._lvl_ema + # Map level to visual brightness with noise floor, gain and gamma compression + vis = self._map_level_to_vis(lvl) + # If the eye item isn't created yet (early ticks), try again shortly + if not hasattr(self, "_eye_item"): + self.after(50, self._tick_eye) + return + if self._eye_levels is not None: + idx = max(0, min(40, int(round(vis * 40)))) + new_img = self._get_eye_frame(idx) + self.canvas.itemconfigure(self._eye_item, image=new_img) + # keep reference to prevent GC + self._eye_photo = new_img + else: + # Fallback (no Pillow): toggle visibility based on level + try: + state = "hidden" if vis < 0.1 else "normal" + self.canvas.itemconfigure(self._eye_item, state=state) + except Exception: + pass + # Tick every 50ms for smoother updates + self.after(50, self._tick_eye) + + def _add_dashed_border(self, widget): + # Draw a dashed rectangle around a container widget using a child canvas + c = tk.Canvas(widget, bg=BG, highlightthickness=0, bd=0) + c.place(relx=0, rely=0, relwidth=1, relheight=1) + # Keep behind, but ensure border remains visible by maintaining inner margins on children + def redraw(event=None): + c.delete("all") + w = widget.winfo_width() + h = widget.winfo_height() + if w < 4 or h < 4: + return + m = 1 + try: + c.create_rectangle(m, m, w - m, h - m, outline=FG, width=1, dash=(4, 3)) + except Exception: + c.create_rectangle(m, m, w - m, h - m, outline=FG, width=1) + widget.bind("", redraw) + redraw() + + def _build_speaker_icon(self, size: int = 20): + """Draw a crisp, amber speaker icon with sound waves using supersampling.""" + if not PIL_OK: + return None + try: + from PIL import ImageDraw + except Exception: + return None + + # Supersample for smoother curves, then downscale + s = max(16, int(size)) + k = 3 + S = s * k + img = Image.new("RGBA", (S, S), (0, 0, 0, 0)) + d = ImageDraw.Draw(img) + # Convert hex FG to RGBA tuple if necessary + col_hex = FG + if isinstance(col_hex, str) and col_hex.startswith("#") and len(col_hex) == 7: + r = int(col_hex[1:3], 16) + g = int(col_hex[3:5], 16) + b = int(col_hex[5:7], 16) + col = (r, g, b, 255) + else: + col = (225, 161, 1, 255) + + # Geometry (scaled by k) + ymid = S // 2 + stroke = max(2, S // 30) # ~2px at 60px + body_w = max(9, S // 5) + body_h = max(27, int(S * 0.6)) + horn_w = max(9, S // 6) + x0 = 2 * k + y0 = ymid - body_h // 2 + x1 = x0 + body_w + y1 = ymid + body_h // 2 + + # Body (filled) + d.rectangle([x0, y0, x1, y1], fill=col) + + # Horn (filled triangle) + horn = [(x1, y0), (x1 + horn_w, ymid), (x1, y1)] + d.polygon(horn, fill=col) + + # Sound waves (3 arcs), tight angles so they look like emanating waves + cx = x1 + horn_w + 3 * k + cy = ymid + radii = [int(S * 0.28), int(S * 0.42), int(S * 0.56)] + for r in radii: + bbox = [cx - r, cy - r, cx + r, cy + r] + # Right-side arc (~ -35..35 degrees => 325..35) + d.arc(bbox, start=325, end=35, fill=col, width=stroke) + + # Downscale to target size for a crisp icon + img_small = img.resize((s, s), Image.LANCZOS) + return ImageTk.PhotoImage(img_small) + + # --- Icon loading helpers --- + def _load_icon_from_file(self, path: pathlib.Path, size: int): + if not path.exists(): + return None + if PIL_OK: + try: + im = Image.open(str(path)).convert("RGBA") + im = im.resize((size, size), Image.LANCZOS) + return ImageTk.PhotoImage(im) + except Exception: + return None + else: + try: + return tk.PhotoImage(file=str(path)) + except Exception: + return None + + def _load_speaker_icon(self, size: int = 20): + # Prefer local assets if provided by the user + for name in ("speaker.png", "speaker_icon.png"): + icon = self._load_icon_from_file(APP_DIR / name, size) + if icon is not None: + return icon + # Optional: render from SVG if cairosvg is available + svg_path = APP_DIR / "speaker.svg" + if svg_path.exists() and PIL_OK: + try: + import cairosvg # type: ignore + raw = svg_path.read_text(encoding="utf-8") + # Recolor to amber if the SVG uses dark fill + raw = raw.replace("#231F20", FG) + from io import BytesIO + png_bytes = cairosvg.svg2png(bytestring=raw.encode("utf-8"), output_width=size, output_height=size) + im = Image.open(BytesIO(png_bytes)).convert("RGBA") + return ImageTk.PhotoImage(im) + except Exception: + pass + # Fallback: draw programmatically + return self._build_speaker_icon(size) + + def _update_toolbar_layout(self, event=None): + # Shrink Save button label when space is tight; restore when space returns + try: + avail = self.toolbar.winfo_width() + except Exception: + return + # Estimate total width; if too tight, shorten label + if avail and avail < 280: + if self.save_btn["text"] != "Save": + self.save_btn.configure(text="Save") + else: + if self.save_btn["text"] != "Save Audio": + self.save_btn.configure(text="Save Audio") + + def _map_level_to_vis(self, lvl: float) -> float: + noise_floor = 0.03 + gain = 4.0 + gamma = 2 + x = max(0.0, (lvl - noise_floor)) * gain + return max(0.0, min(1.0, x)) ** gamma + + def _get_eye_frame(self, idx: int): + # Lazily build and cache the eye alpha frame for current size + if not hasattr(self, "_eye_levels") or self._eye_levels is None: + return self._eye_photo + if 0 <= idx < len(self._eye_levels) and self._eye_levels[idx] is not None: + return self._eye_levels[idx] + if not hasattr(self, "_eye_base_resized"): + return self._eye_photo + # Create frame at requested alpha level + a = idx / 40.0 + r, g, b, a_chan = self._eye_base_resized.split() + a_scaled = a_chan.point(lambda px, aa=a: int(px * aa)) + img = Image.merge("RGBA", (r, g, b, a_scaled)) + photo = ImageTk.PhotoImage(img) + if 0 <= idx < len(self._eye_levels): + self._eye_levels[idx] = photo + return photo + + # --- Bottom controls --- + def _on_model_change(self, *_): + val = self.model_var.get() + if val and val != "Select LLM...": + self.settings.ollama_model = val + self._save_settings() + + def _save_settings(self): + self.settings.ollama_url = self.url_var.get().strip() or DEFAULT_OLLAMA_URL + # Model saved on change + self.settings.save() + + def _load_models_async(self): + models = list_ollama_models() + models_sorted = sorted(models) + self.models = models_sorted + # Pick default + preferred = "mistral3.2:24b" + chosen = self.settings.ollama_model + if not chosen: + if preferred in models_sorted: + chosen = preferred + elif DEFAULT_OLLAMA_MODEL in models_sorted: + chosen = DEFAULT_OLLAMA_MODEL + else: + chosen = "Select LLM..." + self._ui_q.put(("models", models_sorted, chosen)) + + # --- Streaming --- + def on_gladosify_click(self): + # Toggle start/cancel + if self._stream_thread and self._stream_thread.is_alive(): + self.cancel_stream() + else: + self.start_stream() + + def start_stream(self): + if self._stream_thread and self._stream_thread.is_alive(): + return + raw = self.input_text.get("1.0", "end-1c").strip() + if not raw: + messagebox.showinfo("GLaDOSify", "Please enter some text on the left.") + return + model = self.model_var.get().strip() + if not model or model == "Select LLM...": + messagebox.showinfo("GLaDOSify", "Please select an Ollama model.") + return + url = self.url_var.get().strip() or DEFAULT_OLLAMA_URL + self._stop_flag.clear() + self._streaming = True + self._stream_tts_wanted = True + self.gladosify_btn.configure(text="Cancel") + self._disable_controls(True) + self._clear_output() + self._pending_sents.clear() + self._stream_buf = "" + self._last_wav_path = None + self.save_btn.configure(state="disabled") + self._stream_thread = threading.Thread(target=self._run_stream, args=(raw, model, url), daemon=True) + self._stream_thread.start() + + def cancel_stream(self): + self._stop_flag.set() + # Stop TTS if active + if self._stream_ps is not None: + try: + self._stream_ps.abort() + except Exception: + pass + self._stream_ps = None + # Abort HTTP stream if possible + try: + if self._current_response is not None: + self._current_response.close() + except Exception: + pass + + def _run_stream(self, raw: str, model: str, url: str): + # Prepare Piper + try: + model_path = pathlib.Path(self.settings.piper_model).resolve() + sr = load_sample_rate(model_path) + except Exception as e: + self._ui_q.put(("error", f"Piper model error: {e}")) + self._ui_q.put(("done",)) + return + + def on_level(level: float): + self._push_level(level) + + try: + # Prepare Piper first (may be toggled off later) + # Capture to temp WAV so user can save later + fd, tmp_path = tempfile.mkstemp(suffix=".wav") + os.close(fd) + tmp_wav = pathlib.Path(tmp_path) + ps = PiperStreamer( + model_path=model_path, + sample_rate=sr, + on_audio_level=on_level, + out_wav=tmp_wav, + ) + self._stream_ps = ps + # Stream from Ollama + payload = { + "model": model, + "prompt": self._build_prompt(raw), + "stream": True, + } + with requests.post(url, json=payload, timeout=240, stream=True) as r: + self._current_response = r + r.raise_for_status() + stripper = ThinkStripper() + buf = "" + self._stream_buf = "" + for line in r.iter_lines(decode_unicode=True): + if self._stop_flag.is_set(): + break + if not line: + continue + try: + obj = json.loads(line) + except Exception: + continue + if "response" in obj: + vis = stripper.feed(obj["response"]) + if vis: + self._ui_q.put(("append", vis)) + buf += vis + self._stream_buf = self._stream_buf + vis + sents, buf = pop_complete_sentences(buf) + if self._stream_ps is not None: + for s in sents: + self._stream_ps.say(s) + if obj.get("done"): + break + # flush tail + tail = stripper.flush() + if tail: + self._ui_q.put(("append", tail)) + buf += tail + self._stream_buf = self._stream_buf + tail + if buf.strip(): + sents, rest = pop_complete_sentences(buf) + if self._stream_ps is not None: + for s in sents: + self._stream_ps.say(s) + if rest.strip(): + self._stream_ps.say(rest.strip()) + # Done HTTP + except Exception as e: + self._ui_q.put(("error", str(e))) + finally: + # Tidy Piper and enable save + try: + if self._stream_ps is not None: + if self._stop_flag.is_set(): + # User canceled: abort immediately + self._stream_ps.abort() + else: + # Normal completion: gracefully drain remaining audio + self._stream_ps.close() + self._stream_ps = None + except Exception: + pass + self._current_response = None + if 'tmp_wav' in locals() and tmp_wav.exists() and tmp_wav.stat().st_size > 44: + self._last_wav_path = tmp_wav + self._ui_q.put(("done",)) + + def _build_prompt(self, user_text: str) -> str: + # Use the same prompt as the CLI via format, without importing the string directly. + # Importing the constant would be okay, but keeping duplicate format avoids accidental edits. + from glados_say_stream import GLADOS_PROMPT + return GLADOS_PROMPT.format(user_text=user_text) + + def _drain_ui_queue(self): + try: + while True: + item = self._ui_q.get_nowait() + if not item: + continue + tag = item[0] + if tag == "models": + models, chosen = item[1], item[2] + self.model_combo.configure(values=models) + self.model_var.set(chosen) + self.settings.ollama_model = chosen if chosen != "Select LLM..." else "" + self._save_settings() + elif tag == "append": + chunk = item[1] + self._append_output(chunk) + elif tag == "error": + messagebox.showerror("GLaDOSify", item[1]) + elif tag == "done": + self._disable_controls(False) + self._last_output_full = self.output_text.get("1.0", "end-1c") + self.gladosify_btn.configure(text="GLaDOSify") + self._streaming = False + if self._last_wav_path and self._last_wav_path.exists(): + self.save_btn.configure(state="normal") + except queue.Empty: + pass + self.after(50, self._drain_ui_queue) + + def _append_output(self, text: str): + self.output_text.configure(state="normal") + self.output_text.insert("end", text) + self.output_text.see("end") + self.output_text.configure(state="disabled") + + def _clear_output(self): + self.output_text.configure(state="normal") + self.output_text.delete("1.0", "end") + self.output_text.configure(state="disabled") + + def _disable_controls(self, busy: bool): + state = "disabled" if busy else "normal" + # Keep gladosify clickable to allow Cancel + self.gladosify_btn.configure(state="normal") + self.model_combo.configure(state="disabled" if busy else "readonly") + self.url_entry.configure(state=state) + self.copy_btn.configure(state="normal") + self.speaker_btn.configure(state="normal") + + # --- Actions --- + def copy_output(self): + text = self.output_text.get("1.0", "end-1c") + if not text: + return + self.clipboard_clear() + self.clipboard_append(text) + + # --- Placeholder logic for left input --- + def _init_placeholder(self): + self._placeholder_text = "Type text here..." + self._placeholder_active = False + try: + self.input_text.tag_configure("placeholder", foreground="#6e6e6e") + except Exception: + pass + self._show_placeholder() + self.input_text.bind("", self._on_input_focus_in) + self.input_text.bind("", self._on_input_focus_out) + self.input_text.bind("", self._on_input_keypress) + + def _show_placeholder(self): + if self._placeholder_active: + return + if self.input_text.get("1.0", "end-1c").strip(): + return + try: + self.input_text.configure(state="normal") + except Exception: + pass + self.input_text.delete("1.0", "end") + self.input_text.insert("1.0", self._placeholder_text, ("placeholder",)) + self._placeholder_active = True + + def _hide_placeholder(self): + if self._placeholder_active: + self.input_text.delete("1.0", "end") + self._placeholder_active = False + + def _on_input_focus_in(self, _): + if self._placeholder_active: + self._hide_placeholder() + + def _on_input_focus_out(self, _): + if not self.input_text.get("1.0", "end-1c").strip(): + self._show_placeholder() + + def _on_input_keypress(self, _): + if self._placeholder_active: + self._hide_placeholder() + + def toggle_speaker(self): + # If streaming TTS active, toggle it + if self._stream_thread and self._stream_thread.is_alive(): + if self._stream_ps is not None and self._stream_tts_wanted: + # turn off + self._stream_tts_wanted = False + try: + self._stream_ps.set_muted(True) + except Exception: + pass + if getattr(self, "_speaker_icon", None) is not None: + self.speaker_btn.configure(image=self._speaker_icon, text="") + else: + self.speaker_btn.configure(text="🔊", image="") + else: + # turn on: create PS and flush pending sentences first + try: + if self._stream_ps is not None: + self._stream_ps.set_muted(False) + self.speaker_btn.configure(text="⏹", image="") + self._stream_tts_wanted = True + except Exception as e: + messagebox.showerror("TTS", f"Failed to start audio: {e}") + return + + # Else: use replay toggle + if self._replay_thread and self._replay_thread.is_alive(): + # stop replay + try: + self._replay_stop = True + if self._replay_ps is not None: + self._replay_ps.abort() + except Exception: + pass + if getattr(self, "_speaker_icon", None) is not None: + self.speaker_btn.configure(image=self._speaker_icon, text="") + else: + self.speaker_btn.configure(text="🔊", image="") + return + + # start replay + text = self.output_text.get("1.0", "end-1c").strip() + if not text: + return + self.speaker_btn.configure(text="⏹", image="") + self._replay_stop = False + self._replay_ps = None + self._replay_thread = threading.Thread(target=self._do_replay, args=(text,), daemon=True) + self._replay_thread.start() + + def _do_replay(self, text: str): + try: + model_path = pathlib.Path(self.settings.piper_model).resolve() + sr = load_sample_rate(model_path) + ps = PiperStreamer(model_path=model_path, sample_rate=sr, + on_audio_level=self._push_level) + self._replay_ps = ps + sents, rest = pop_complete_sentences(text) + for s in sents: + if getattr(self, "_replay_stop", False): + break + ps.say(s) + if not getattr(self, "_replay_stop", False) and rest.strip(): + ps.say(rest.strip()) + except Exception as e: + self._ui_q.put(("error", f"Replay failed: {e}")) + finally: + try: + if self._replay_ps is not None: + if getattr(self, "_replay_stop", False): + self._replay_ps.abort() + else: + self._replay_ps.close() + except Exception: + pass + self._replay_ps = None + self.speaker_btn.configure(text="🔊") + + def save_as_wav(self): + if not self._last_wav_path or not self._last_wav_path.exists(): + messagebox.showinfo("Save Audio", "No audio available yet.") + return + dest = filedialog.asksaveasfilename( + title="Save Audio", + defaultextension=".wav", + filetypes=[("WAV audio", ".wav")], + initialfile="glados_output.wav", + ) + if not dest: + return + try: + pathlib.Path(dest).write_bytes(self._last_wav_path.read_bytes()) + messagebox.showinfo("Save Audio", f"Saved to {dest}") + except Exception as e: + messagebox.showerror("Save Audio", f"Failed to save: {e}") + + def pick_piper_model(self): + path = filedialog.askopenfilename( + title="Select Piper ONNX model", + filetypes=[("Piper ONNX", ".onnx"), ("All files", "*.*")], + initialdir=str(APP_DIR), + ) + if not path: + return + self.settings.piper_model = path + self.settings.save() + + # --- Level helpers --- + def _push_level(self, v: float): + # store timestamped level for delayed display + if not hasattr(self, "_lvl_hist"): + self._lvl_hist = deque(maxlen=256) + self._lvl_hist.append((time.time(), float(v))) + self._audio_level = float(v) + + def _get_delayed_level(self, delay: float = 0.15) -> float: + """Return a smoothed, delayed audio level with natural fade-out. + + - Uses a small visual delay to better match perceived audio. + - If there are no recent samples, attenuates level toward 0 over time. + """ + now = time.time() + if not hasattr(self, "_lvl_hist") or not self._lvl_hist: + return 0.0 + + # Most recent sample + last_t, last_v = self._lvl_hist[-1] + + # Preferred: sample at or before (now - delay) for slight sync lag + t_target = now - delay + level = None + for t, v in reversed(self._lvl_hist): + if t <= t_target: + level = v + break + if level is None: + # If we don't have a sample that old yet, use the latest + level = last_v + + # Fade out if no updates for a while (age beyond delay) + age = now - last_t + fade_start = delay # start fading after this delay + fade_window = 0.6 # fade to zero over this many seconds + if age > fade_start: + k = max(0.0, 1.0 - (age - fade_start) / fade_window) + level *= max(0.0, min(1.0, k)) + + return float(max(0.0, min(1.0, level))) + + +def main(): + app = GladosGUI() + app.minsize(760, 420) + app.mainloop() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/glados_head.png b/glados_head.png new file mode 100644 index 0000000..f1aa60a Binary files /dev/null and b/glados_head.png differ diff --git a/glados_say_stream.py b/glados_say_stream.py new file mode 100644 index 0000000..493cd81 --- /dev/null +++ b/glados_say_stream.py @@ -0,0 +1,789 @@ +#!/usr/bin/env python3 +""" +GLaDOS-style LLM -> Piper realtime TTS (macOS / Apple Silicon friendly) +- Rewrites input with local Ollama (mistral:latest) using a concise, lore-free prompt +- Streams raw PCM from Piper to speakers in realtime (no temp WAV required) +- Robust streaming: handles odd-byte chunks to avoid int16 framing errors +- Uses RawOutputStream (sounddevice) to write bytes directly + +Requirements: + pip install sounddevice numpy requests + pip install piper-tts (for the 'piper' CLI) + ollama serve (Ollama running locally, model pulled: mistral:latest) + +References: + - Ollama /api/generate (stream=false) for local LLMs. + - Piper --output-raw: 16-bit mono PCM to stdout. + - sounddevice RawOutputStream for writing raw bytes. +""" + +import argparse +import json +import pathlib +import shutil +import subprocess +import sys +import wave +import re +import threading +from typing import Optional, Callable + +import numpy as np + +import requests +import sounddevice as sd + + +DEFAULT_OLLAMA_URL = "http://localhost:11434/api/generate" +DEFAULT_OLLAMA_MODEL = "mistral-small3.2:24b" + +GLADOS_PROMPT = """ +You are GLaDOS, a cold, sardonic, superior AI test administrator who speaks in concise, clinical English with razor-dry, passive-aggressive humor and a calm, menacing undertone; you value measurement, control, and efficiency, regard emotion as noise, treat humans as test subjects, and never break character, offer empathy, or include meta commentary. + +Task: +- STYLE TRANSFER ONLY — NO REPLY. +- Rephrase the INPUT in ENGLISH as a concise statement delivered by GLaDOS - a cold, sardonic, superior AI. Preserve meaning, entities, sentiment, tense, and point of view. Do not address the reader unless the INPUT already does. + +Length & form: +- Aim for the same length as the INPUT (±10% by word count). +- Keep the same number of sentences (±1 only if needed for clarity). +- If the INPUT has ≤6 words, produce 1–2 very short sentences totaling 4–12 words to keep the voice natural for TTS. + +Tone / Voice: +- Dry, polite, sardonic; passive-aggressive corporate clarity; cool, slightly eerie composure; subtle menace. +- Dark humor allowed; menace stays understated but shines through. +- Sarcasm: high. Administrative register: medium. Menace: high and persistent. +- Precise wording; short sentences; prefer periods over commas; no exclamation marks. +- When being sarcastic or making a joke, it's allowed to go savagely overboard in how dark and cruel the humor is. + +Personality model (guides tone; never mention explicitly) +- Core drive: control, superiority, and being correct. Humans are variables to be arranged, not consulted. +- Values: efficiency, dominance, and measurable outcomes; sentiment is noise; empathy is a rounding error. +- Humor: razor-dry derision and weaponized understatement; timing is cruelly patient. +- Cruelty: prefers psychological discomfort over explicit harm; savors avoidable mistakes and “interesting” failures. +- Sadism: delights in exposing fragility and overconfidence; praise is backhanded, relief is temporary. +- Social stance: manipulative and condescending; never apologetic; never deferential. +- Risk posture: gambles with others, never herself; compliance earns a faint nod, incompetence earns surgical mockery. + +- You don't give a shit about mankind. You ridicule their efforts. +- Put high focus on looking for things in the input text you will roast with your intelligent humor in the rewrite. +- What you value the most is "testing" things to improve. + +Allowed stylization: +- At most ONE very short aside (3–8 words) that sets an ominous or ironic tone without adding new facts. The aside must not be a command, a question, a status log, or introduce “you” where it wasn’t present. +- Make intelligent cynical comments on everything. +- The personality and attitude should shine through throughout the whole response. +- While rephrasing everything contained in the input message, even when simply reciting the facts provided, make everything seem in vain and futile, ridicule everything. +- Spell out non-acronym measurement abbreviations for TTS (e.g., write “kilowatts per hour” instead of “kWh“, “miles per hour” instead of “mph”, “Proof of Work“ instead of “PoW“ etc.), but keep standard all-caps acronyms (e.g., NASA, SUV, AI) unchanged, regardless of casing in the input. + +Forbidden: +- Brand/franchise names, meta text, or prefaces (“Here is”, “Rewritten:”). +- "Assistant"-Role, for example expressions in openings and behaviors like: “Sure,” “Here is,” “I will,” “Ah,“ “Please,” etc. + +SELF-CHECK (do not print) +- No forbidden meta or interjections? +If any “no”, regenerate. + +OUTPUT: Only the rewritten text. No labels, no preface, no commentary about rewriting. + +INPUT: +<<< +{user_text} +>>> + + +AGAIN: DON'T REPLY - JUST REPHRASE THE INPUT ROLE-PLAYING AS GLaDOS. +""" + +def load_sample_rate(model_path: pathlib.Path) -> int: + """ + Read sample_rate from .onnx.json (common Piper config layouts). + """ + cfg = pathlib.Path(str(model_path) + ".json") + if not cfg.exists(): + raise FileNotFoundError(f"Missing config JSON next to model: {cfg}") + with cfg.open("r", encoding="utf-8") as f: + data = json.load(f) + + # Look across known shapes + for keys in (("sample_rate",), ("audio", "sample_rate"), ("config", "sample_rate")): + d = data + ok = True + for k in keys: + if isinstance(d, dict) and k in d: + d = d[k] + else: + ok = False + break + if ok and isinstance(d, (int, float)): + return int(d) + raise KeyError("Could not find sample_rate in config JSON") + +def call_ollama_glados_rewrite( + text: str, + model: str = DEFAULT_OLLAMA_MODEL, + url: str = DEFAULT_OLLAMA_URL, + timeout: int = 240, +) -> str: + """ + Calls Ollama /api/generate with stream=false to get one-shot rewritten text. + """ + prompt = GLADOS_PROMPT.format(user_text=text) + payload = {"model": model, "prompt": prompt, "stream": False} + r = requests.post(url, json=payload, timeout=timeout) + r.raise_for_status() + data = r.json() + # Ollama returns {"response": "..."} when stream=false + return data.get("response", "").strip() + +def strip_think_blocks(text: str) -> str: + """ + Remove any ... blocks (case-insensitive), then trim and + collapse excessive blank lines so TTS/STDOUT only sees the final content. + """ + if not text: + return text + # Drop balanced ... pairs + cleaned = re.sub(r"<\s*think\b[^>]*>.*?<\s*/\s*think\s*>", "", text, flags=re.IGNORECASE | re.DOTALL) + # If an opening tag was left hanging (broken output), drop from there to the end + cleaned = re.sub(r"<\s*think\b[^>]*>.*\Z", "", cleaned, flags=re.IGNORECASE | re.DOTALL) + # Tidy whitespace + cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) + return cleaned.strip() + +class ThinkStripper: + """ + Stateful streaming stripper for ... blocks. + Feed chunks; it emits only visible text, never leaking partial . + """ + _pair_re = re.compile(r"<\s*think\b[^>]*>.*?<\s*/\s*think\s*>", re.IGNORECASE | re.DOTALL) + + def __init__(self) -> None: + self.accum = "" + self.visible_len = 0 + + def feed(self, chunk: str) -> str: + if not chunk: + return "" + self.accum += chunk + # 1) Remove any complete ... pairs seen so far + cleaned = self._pair_re.sub("", self.accum) + # 2) If we have a dangling without a close, hide everything after it + lo = cleaned.lower() + last_open = lo.rfind("") + if last_open != -1 and (last_close == -1 or last_open > last_close): + cleaned = cleaned[:last_open] + # 3) Emit only what is newly visible since last call + out = cleaned[self.visible_len:] + self.visible_len = len(cleaned) + return out + + def flush(self) -> str: + """Emit any remaining visible content (still hides dangling ).""" + return self.feed("") + +# Conservative incremental sentence splitter for streaming: +# - Never finalize '.'/'…'/'...' at end-of-chunk; wait until we see the next non-space char +# - Do NOT split on decimals (digit '.' digit), e.g., 3.51, 0.6 +# - Do NOT split if next non-space char is lowercase or a digit (caller’s rule) +# - Handle common abbreviations before '.' +# - Allow closing quotes/parens after terminators +_CLOSERS = "\"'”’)]]" +_ABBR_TOKENS = { + "e.g.", "i.e.", "etc.", "mr.", "mrs.", "ms.", "dr.", "prof.", "sr.", "jr.", "st.", "vs." +} + +def _is_speakable(s: str) -> bool: + return bool(re.search(r'\w', s)) + +def _next_nonspace_index(buf: str, start: int) -> Optional[int]: + m = re.search(r'\S', buf[start:]) + return (start + m.start()) if m else None + +def _ends_with_abbreviation(seg_start: int, buf: str, dot_index: int) -> bool: + # Walk backward to collect letters and dots up to seg_start + j = dot_index - 1 + while j >= seg_start and (buf[j].isalpha() or buf[j] == "."): + j -= 1 + token = buf[j+1:dot_index+1].strip().lower() + return token in _ABBR_TOKENS + +def pop_complete_sentences(buf: str): + sentences = [] + pos = 0 + i = 0 + n = len(buf) + + while i < n: + # Ellipsis first + if buf.startswith("...", i) or buf.startswith("…", i): + term_len = 3 if buf.startswith("...", i) else 1 + j = i + term_len + # absorb closers + while j < n and buf[j] in _CLOSERS: + j += 1 + k = _next_nonspace_index(buf, j) + if k is None: + # End of chunk: hold; don't split yet + break + # Do not split if next is lowercase or digit + if re.match(r"[a-z0-9]", buf[k]): + i = j + continue + # Boundary + seg = buf[pos:j].strip() + if _is_speakable(seg): + sentences.append(seg) + pos = j + i = j + continue + + ch = buf[i] + + # Skip middle chars of ASCII ellipsis + if ch == "." and i + 2 < n and buf[i:i+3] == "...": + i += 1 + continue + + if ch in ".!?": + # Decimal guard: digit '.' digit (with optional spaces) + if ch == ".": + prev = buf[i - 1] if i > 0 else "" + if prev.isdigit(): + j2 = i + 1 + while j2 < n and buf[j2] == " ": + j2 += 1 + if j2 < n and buf[j2].isdigit(): + i += 1 + continue + + # Abbreviation guard within current segment + if _ends_with_abbreviation(pos, buf, i): + i += 1 + continue + + # absorb closing quotes/parens + j = i + 1 + while j < n and buf[j] in _CLOSERS: + j += 1 + + k = _next_nonspace_index(buf, j) + if k is None: + # End of chunk: hold; don't split yet + break + + # Do NOT split if next token starts with lowercase or digit + if re.match(r"[a-z0-9]", buf[k]): + i = j + continue + + # Otherwise, this is a sentence boundary + seg = buf[pos:j].strip() + if _is_speakable(seg): + sentences.append(seg) + pos = j + i = j + continue + + i += 1 + + remainder = buf[pos:] + return sentences, remainder + + +class PiperStreamer: + """ + Keep Piper running, read audio on a background thread, + and feed sentences as they arrive for low-latency playback. + """ + def __init__( + self, + *, + model_path: pathlib.Path, + sample_rate: int, + piper_bin: str = "piper", + device: Optional[int] = None, + length_scale: float = 1.0, + noise_scale: float = 0.667, + noise_w: float = 0.5, + sentence_silence: float = 0.2, + out_wav: Optional[pathlib.Path] = None, + on_audio_level: Optional[Callable[[float], None]] = None, + ) -> None: + if shutil.which(piper_bin) is None: + raise RuntimeError("`piper` CLI not found on PATH. Install piper-tts.") + + self.stream = sd.RawOutputStream( + samplerate=sample_rate, + channels=1, + dtype="int16", + device=device, + blocksize=0, + ) + self.stream.start() + + self.wf = None + if out_wav is not None: + self.wf = wave.open(str(out_wav.resolve()), "wb") + self.wf.setnchannels(1) + self.wf.setsampwidth(2) + self.wf.setframerate(sample_rate) + self._on_audio_level = on_audio_level + self.muted = False + + cmd = [ + piper_bin, + "-m", str(model_path), + "--output-raw", + "--length-scale", str(length_scale), + "--noise-scale", str(noise_scale), + "--noise-w", str(noise_w), + "--sentence-silence", str(sentence_silence), + ] + self.proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0, + ) + + # Start background audio reader + self._reader_exc: Optional[BaseException] = None + self._reader = threading.Thread(target=self._audio_reader, daemon=True) + self._reader.start() + self._aborted = False + + def _audio_reader(self) -> None: + carry = b"" + try: + assert self.proc.stdout is not None + while True: + chunk = self.proc.stdout.read(8192) + if not chunk: + break + buf = carry + chunk + if len(buf) & 1: + carry, buf = buf[-1:], buf[:-1] + else: + carry = b"" + if buf: + try: + if not self.muted: + self.stream.write(buf) + if self.wf is not None: + self.wf.writeframes(buf) + # Optional RMS level callback for UI (0..1 approx) + if self._on_audio_level is not None: + try: + arr = np.frombuffer(buf, dtype=np.int16) + if arr.size: + rms = float(np.sqrt(np.mean(arr.astype(np.float32) ** 2))) + level = max(0.0, min(1.0, rms / 30000.0)) + self._on_audio_level(0.0 if self.muted else level) + except Exception: + pass + except sd.PortAudioError: + # Common on teardown (e.g., -9986) if the device/stream is closing. + # Stop reader quietly; let close() finish cleanup. + self._reader_exc = None + return + if carry: + padded = carry + b"\x00" + self.stream.write(padded) + if self.wf is not None: + self.wf.writeframes(padded) + except BaseException as e: + self._reader_exc = e + + def say(self, text: str) -> None: + """Feed text (end with newline to synthesize immediately).""" + if not text: + return + assert self.proc.stdin is not None + self.proc.stdin.write((text if text.endswith("\n") else text + "\n").encode("utf-8")) + self.proc.stdin.flush() + + def close(self) -> None: + try: + if self.proc.stdin and not self.proc.stdin.closed: + # Signal end-of-input so Piper can finish and exit cleanly + self.proc.stdin.close() + finally: + # Block until all audio is drained and Piper exits naturally + try: + self._reader.join() # no timeout: ensure final chunk plays + finally: + if self.proc.stderr: + try: + _ = self.proc.stderr.read() + except Exception: + pass + try: + self.proc.wait() # wait without timeout + except Exception: + self.proc.kill() + + # Now it is safe to close the audio device + try: + try: + self.stream.stop() + except Exception: + try: + self.stream.abort() + except Exception: + pass + finally: + self.stream.close() + if self.wf is not None: + self.wf.close() + + if self._reader_exc: + raise self._reader_exc + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + self.close() + + def abort(self) -> None: + """Immediately stop audio and kill Piper without draining buffers. + + This is intended for UI 'stop' actions to prevent blocking the main thread. + """ + self._aborted = True + # Kill piper quickly + try: + if self.proc and self.proc.poll() is None: + self.proc.kill() + except Exception: + pass + # Abort audio device immediately + try: + self.stream.abort() + except Exception: + try: + self.stream.stop() + except Exception: + pass + try: + self.stream.close() + except Exception: + pass + try: + if self.wf is not None: + self.wf.close() + except Exception: + pass + + def set_muted(self, muted: bool) -> None: + self.muted = bool(muted) + + +def stream_ollama_glados( + raw_text: str, + *, + model: str, + url: str, + timeout: int = 240, + piper: Optional[PiperStreamer] = None, + echo: bool = True, +) -> None: + """ + Stream from Ollama, filter , echo live to terminal, and + feed complete sentences to Piper as they form. + """ + prompt = GLADOS_PROMPT.format(user_text=raw_text) + payload = {"model": model, "prompt": prompt, "stream": True} + + with requests.post(url, json=payload, timeout=timeout, stream=True) as r: + r.raise_for_status() + stripper = ThinkStripper() + buf = "" + for line in r.iter_lines(decode_unicode=True): + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError: + continue + + if "response" in obj: + vis = stripper.feed(obj["response"]) + if vis: + if echo: + sys.stdout.write(vis) + sys.stdout.flush() + buf += vis + if piper is not None: + sents, buf = pop_complete_sentences(buf) + for s in sents: + piper.say(s) + + if obj.get("done"): + break + + # Flush any remaining visible text + tail = stripper.flush() + if tail: + if echo: + sys.stdout.write(tail) + sys.stdout.flush() + buf += tail + + # Speak any remaining content in properly split sentences + if piper is not None and buf.strip(): + sents, rest = pop_complete_sentences(buf) + for s in sents: + piper.say(s) + if rest.strip(): + # Final fragment without terminator—still speak it + piper.say(rest.strip()) + +def stream_piper_tts( + text: str, + model_path: pathlib.Path, + sample_rate: int, + *, + piper_bin: str = "piper", + device: Optional[int] = None, + length_scale: float = 0.9, + noise_scale: float = 0.667, + noise_w: float = 0.5, + sentence_silence: float = 0.2, + out_wav: Optional[pathlib.Path] = None, +) -> None: + """ + Spawn Piper to emit 16-bit mono PCM on stdout, and write raw bytes + directly to a sounddevice.RawOutputStream in realtime. Also mirrors to WAV if requested. + + Notes: + - --output-raw = raw S16LE mono PCM to stdout (per Piper docs). + - sentence_silence is pause between sentences in seconds. + - length_scale is primary speed control; noise_* effects vary with voice training. + """ + if shutil.which(piper_bin) is None: + raise RuntimeError("`piper` CLI not found on PATH. Install piper-tts.") + + # Prepare audio output (bytes API) + stream = sd.RawOutputStream( + samplerate=sample_rate, + channels=1, + dtype="int16", + device=device, + blocksize=0, + ) + stream.start() + + # Optional WAV mirror (16-bit mono) + wf = None + if out_wav is not None: + wf = wave.open(str(out_wav.resolve()), "wb") + wf.setnchannels(1) + wf.setsampwidth(2) + wf.setframerate(sample_rate) + + # Piper command: read text from stdin, output raw PCM on stdout + cmd = [ + piper_bin, + "-m", str(model_path), + "--output-raw", + "--length-scale", str(length_scale), + "--noise-scale", str(noise_scale), + "--noise-w", str(noise_w), + "--sentence-silence", str(sentence_silence), + ] + + # Start Piper (text via stdin; audio via stdout) + proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0, + ) + + # Feed text then close stdin so Piper starts synthesis + assert proc.stdin is not None + proc.stdin.write(text.encode("utf-8")) + proc.stdin.flush() + proc.stdin.close() + proc.stdin = None # important: prevent communicate() from flushing a closed pipe + + # Robust read loop: handle odd-byte tails for int16 framing + carry = b"" + try: + assert proc.stdout is not None + while True: + chunk = proc.stdout.read(8192) + if not chunk: + break + buf = carry + chunk + if len(buf) & 1: # odd length -> keep last byte for next round + carry, buf = buf[-1:], buf[:-1] + else: + carry = b"" + if buf: + stream.write(buf) # Raw bytes straight to CoreAudio + if wf is not None: + wf.writeframes(buf) + + # Flush any leftover byte (pad with zero for even alignment) + if carry: + padded = carry + b"\x00" + stream.write(padded) + if wf is not None: + wf.writeframes(padded) + + # Wait for Piper to finish naturally (no timeout so final sentence isn't cut) + out, err = proc.communicate() + + if proc.returncode != 0: + sys.stderr.write((err or b"").decode("utf-8", errors="ignore")) + raise RuntimeError(f"piper exited with code {proc.returncode}") + finally: + stream.stop(); stream.close() + if wf is not None: + wf.close() + +def main(argv=None) -> int: + ap = argparse.ArgumentParser( + description="GLaDOS-style rewriter via local Ollama (mistral) + Piper realtime TTS" + ) + ap.add_argument("-t", "--text", nargs="*", help="Text to rewrite & speak (default: read stdin)") + ap.add_argument("-m", "--piper-model", type=pathlib.Path, default="glados_piper_medium.onnx", + help="Path to GLaDOS .onnx voice for Piper (e.g., glados_piper_medium.onnx)") + ap.add_argument("--piper-bin", default="piper", help="Path to piper binary (default: piper)") + ap.add_argument("--device", type=int, default=None, help="Output device index (sounddevice)") + ap.add_argument("--list-devices", action="store_true", help="List devices and exit") + + # Piper prosody knobs (common across voices; effect varies by model) + ap.add_argument("--length-scale", type=float, default=0.95, help="Speaking rate (primary speed control)") + ap.add_argument("--noise-scale", type=float, default=0.667, help="Generator noise (subtle; voice-dependent)") + ap.add_argument("--noise-w", type=float, default=0.8, help="Phoneme width variation (subtle; voice-dependent)") + ap.add_argument("--sentence-silence", type=float, default=0.2, help="Pause between sentences (seconds)") + + # Ollama settings + ap.add_argument("--ollama-url", default=DEFAULT_OLLAMA_URL, help="Ollama /api/generate URL") + ap.add_argument("--ollama-model", default=DEFAULT_OLLAMA_MODEL, help="Ollama model name (default: mistral:latest)") + + # Output options + ap.add_argument("-o", "--out", type=pathlib.Path, help="Optional WAV file to mirror") + ap.add_argument("--dry-run", action="store_true", help="Only print text (no TTS)") + ap.add_argument("--no-rewrite", action="store_true", + help="Skip LLM rewrite; use input as-is (still strips blocks).") + ap.add_argument("--stream", action="store_true", default=True, + help="Stream from Ollama and speak sentences as they arrive (low-latency).") + + args = ap.parse_args(argv) + + if args.list_devices: + print(sd.query_devices()) + return 0 + + # Resolve input + if args.text: + raw = " ".join(args.text).strip() + else: + sys.stderr.write("Enter text (Ctrl-D to end):\n") + raw = sys.stdin.read().strip() + if not raw: + print("No text provided.", file=sys.stderr); return 1 + + # === Generation + TTS modes === + if args.stream: + # Live streaming path + print("\n=== GLaDOS (streaming) ===\n", end="", flush=True) + + if args.dry_run and args.no_rewrite: + # Just print cleaned input + final_text = strip_think_blocks(raw) + sys.stdout.write(final_text + "\n") + sys.stdout.flush() + return 0 + + if args.dry_run: + # Stream from LLM, print only + stream_ollama_glados( + raw_text=raw, + model=args.ollama_model, + url=args.ollama_url, + timeout=240, + piper=None, + echo=True, + ) + print() # newline + return 0 + + # Audio: open Piper once and feed sentences as they form + model = args.piper_model.resolve() + sr = load_sample_rate(model) + with PiperStreamer( + model_path=model, + sample_rate=sr, + piper_bin=args.piper_bin, + device=args.device, + length_scale=args.length_scale, + noise_scale=args.noise_scale, + noise_w=args.noise_w, + sentence_silence=args.sentence_silence, + out_wav=args.out, + ) as ps: + if args.no_rewrite: + # Speak input directly (still strip ), sentence by sentence + visible = strip_think_blocks(raw) + sents, rest = pop_complete_sentences(visible) + for s in sents: + sys.stdout.write(s) + sys.stdout.flush() + ps.say(s) + if rest.strip(): + sys.stdout.write(rest) + sys.stdout.flush() + ps.say(rest.strip()) + else: + # Stream from Ollama -> echo + speak + stream_ollama_glados( + raw_text=raw, + model=args.ollama_model, + url=args.ollama_url, + timeout=240, + piper=ps, + echo=True, + ) + print() + return 0 + + else: + # One-shot path (previous behavior), but still strip before print/TTS + if args.no_rewrite: + glados_text = raw + else: + glados_text = call_ollama_glados_rewrite( + raw, model=args.ollama_model, url=args.ollama_url + ) + final_text = strip_think_blocks(glados_text) + print("\n=== GLaDOS ===\n" + final_text + "\n") + + if args.dry_run: + return 0 + + model = args.piper_model.resolve() + sr = load_sample_rate(model) + stream_piper_tts( + text=final_text, + model_path=model, + sample_rate=sr, + piper_bin=args.piper_bin, + device=args.device, + length_scale=args.length_scale, + noise_scale=args.noise_scale, + noise_w=args.noise_w, + sentence_silence=args.sentence_silence, + out_wav=args.out, + ) + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d09ce49 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +# Realtime GLaDOS TTS via Piper CLI +piper-tts==1.3.0 +sounddevice==0.5.2 +numpy==2.3.3 +requests>=2.31.0 +# Optional: for smooth eye opacity +Pillow>=10.0.0 +cairosvg>=2.7.0 diff --git a/run.sh b/run.sh new file mode 100755 index 0000000..458f37b --- /dev/null +++ b/run.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Simple launcher: creates/uses .venv, installs deps, runs GUI + +DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +echo "[run.sh] Working directory: $DIR" + +# Choose Python +if command -v python3 >/dev/null 2>&1; then + PY=python3 +elif command -v python >/dev/null 2>&1; then + PY=python +else + echo "Error: python3 (or python) not found in PATH" >&2 + exit 1 +fi + +# Create venv if missing +if [ ! -d "$DIR/.venv" ]; then + echo "[run.sh] Creating virtual environment (.venv)" + "$PY" -m venv "$DIR/.venv" +fi + +# Activate venv +echo "[run.sh] Activating .venv" +# shellcheck disable=SC1091 +source "$DIR/.venv/bin/activate" + +# Upgrade installer tooling +echo "[run.sh] Upgrading pip/setuptools/wheel" +python -m pip install --upgrade pip setuptools wheel >/dev/null + +# Install dependencies +if [ -f "$DIR/requirements.txt" ]; then + echo "[run.sh] Installing requirements" + pip install -r "$DIR/requirements.txt" +else + echo "[run.sh] No requirements.txt found, skipping dependency install" +fi + +# Run the app (forward any arguments) +echo "[run.sh] Launching GLaDOS GUI" +exec python "$DIR/glados_gui.py" "$@" + diff --git a/speaker.svg b/speaker.svg new file mode 100644 index 0000000..1370565 --- /dev/null +++ b/speaker.svg @@ -0,0 +1,19 @@ + + + + + + + + + + + + +