Files
GLaDOSify/glados_gui.py

1089 lines
43 KiB
Python
Raw Normal View History

2025-09-11 03:11:55 +02:00
#!/usr/bin/env python3
import json
import pathlib
import tempfile
import queue
import threading
import subprocess
import sys
import time
import os
from dataclasses import dataclass
from typing import List, Optional
import requests
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import tkinter.font as tkfont
from collections import deque
# Optional Pillow for alpha control on eye overlay
try:
from PIL import Image, ImageTk, ImageEnhance
PIL_OK = True
except Exception:
PIL_OK = False
# Reuse streaming utilities
from glados_say_stream import (
DEFAULT_OLLAMA_MODEL,
DEFAULT_OLLAMA_URL,
ThinkStripper,
pop_complete_sentences,
PiperStreamer,
load_sample_rate,
)
APP_DIR = pathlib.Path(__file__).resolve().parent
CONFIG_PATH = APP_DIR / "config.json"
EYE_IMAGE_PATH = APP_DIR / "glados_eye.png"
HEAD_IMAGE_PATH = APP_DIR / "glados_head.png"
2025-09-12 09:05:10 +02:00
ICON_IMAEG_PATH = APP_DIR / "icon.png"
2025-09-11 03:11:55 +02:00
PIPER_MODEL_DEFAULT = APP_DIR / "glados_piper_medium.onnx"
BG = "#000000"
BOX_BG = "#000000"
FG = "#e1a101"
@dataclass
class Settings:
ollama_url: str = DEFAULT_OLLAMA_URL
ollama_model: str = ""
piper_model: str = str(PIPER_MODEL_DEFAULT)
@classmethod
def load(cls) -> "Settings":
if CONFIG_PATH.exists():
try:
data = json.loads(CONFIG_PATH.read_text())
return cls(
ollama_url=data.get("ollama_url", DEFAULT_OLLAMA_URL),
ollama_model=data.get("ollama_model", ""),
piper_model=data.get("piper_model", str(PIPER_MODEL_DEFAULT)),
)
except Exception:
pass
return cls()
def save(self) -> None:
data = {
"ollama_url": self.ollama_url,
"ollama_model": self.ollama_model,
"piper_model": self.piper_model,
}
CONFIG_PATH.write_text(json.dumps(data, indent=2))
def list_ollama_models() -> List[str]:
# Prefer CLI to avoid needing special HTTP paths
try:
out = subprocess.check_output(["ollama", "list", "--quiet"], stderr=subprocess.DEVNULL, text=True)
models = [line.strip() for line in out.splitlines() if line.strip()]
return models
except Exception:
# Best-effort fallback via tags API (local Ollama)
try:
r = requests.get(DEFAULT_OLLAMA_URL.rsplit("/", 1)[0] + "/tags", timeout=2)
r.raise_for_status()
tags = r.json().get("models", [])
names = []
for m in tags:
name = m.get("name") or m.get("model")
if name:
names.append(name)
return names
except Exception:
return []
# --- App icon helpers (Tk + macOS Dock) ---
def _set_app_icon(root: tk.Tk, png_path: pathlib.Path):
"""
Sets the window icon (cross-platform) and, on macOS, the Dock icon.
Falls back gracefully if assets or modules are missing.
"""
# 1) Tk window/taskbar icon
try:
if png_path.exists():
try:
if PIL_OK:
_img = Image.open(str(png_path))
root._app_icon = ImageTk.PhotoImage(_img) # keep ref!
else:
root._app_icon = tk.PhotoImage(file=str(png_path)) # keep ref!
root.iconphoto(True, root._app_icon)
except Exception:
pass
except Exception:
pass
# 2) macOS Dock icon via PyObjC (optional)
if sys.platform == "darwin":
try:
from Cocoa import NSImage, NSApplication # type: ignore
app = NSApplication.sharedApplication()
nsimg = NSImage.alloc().initWithContentsOfFile_(str(png_path))
if nsimg is not None:
app.setApplicationIconImage_(nsimg)
except Exception:
# PyObjC not available or other issue; ignore silently
pass
2025-09-11 03:11:55 +02:00
class GladosGUI(tk.Tk):
def __init__(self):
super().__init__()
self.title("GLaDOSify")
2025-09-12 09:05:10 +02:00
_set_app_icon(self, ICON_IMAEG_PATH)
2025-09-11 03:11:55 +02:00
self.configure(bg=BG)
# Global font/color defaults (use named font to handle space in family name)
self._ui_font = tkfont.Font(family="Lucida Console", size=12)
self.option_add("*Font", self._ui_font)
self.option_add("*Background", BG)
self.option_add("*foreground", FG)
self.option_add("*Foreground", FG)
self.option_add("*selectBackground", FG)
self.option_add("*selectForeground", BG)
self.option_add("*insertBackground", FG)
self.geometry("780x460")
self.settings = Settings.load()
self.models = [] # populated async
# State
self._stream_thread: Optional[threading.Thread] = None
self._replay_thread: Optional[threading.Thread] = None
self._stop_flag = threading.Event()
self._ui_q: queue.Queue[str] = queue.Queue()
self._audio_level = 0.0
self._lvl_ema = 0.0 # smoothed visual level
self._last_wav_path: Optional[pathlib.Path] = None
self._last_output_full = ""
self._streaming = False
self._stream_tts_wanted = True
self._stream_ps: Optional[PiperStreamer] = None
self._current_response = None
self._pending_sents: deque[str] = deque()
self._stream_buf = ""
# Layout: 3 columns (left text | center image | right text)
self.columnconfigure(0, weight=1)
# Keep center column visible and allow it to expand
self.columnconfigure(1, weight=1, minsize=260)
self.columnconfigure(2, weight=1)
self.rowconfigure(0, weight=1)
self.rowconfigure(1, weight=0)
# Left editable textbox
self.left_frame = tk.Frame(self, bg=BG)
self.left_frame.grid(row=0, column=0, sticky="nsew")
self.input_wrap = tk.Frame(self.left_frame, bg=BG)
self.input_wrap.pack(fill="both", expand=True)
self._add_dashed_border(self.input_wrap)
self.input_text = tk.Text(
self.input_wrap,
bg=BOX_BG,
fg=FG,
insertbackground=FG,
wrap="word",
bd=0,
highlightthickness=0,
padx=10,
pady=10,
)
self.input_text.pack(fill="both", expand=True, padx=4, pady=4)
# Placeholder for input
self._init_placeholder()
# Left toolbar (below left textbox)
self.left_toolbar = tk.Frame(self.left_frame, bg=BG)
self.left_toolbar.pack(side="bottom", fill="x")
# Center GLaDOSify button by using expanding spacers
self._lt_sp_left = tk.Frame(self.left_toolbar, bg=BG)
self._lt_sp_left.pack(side="left", expand=True, fill="x")
self.gladosify_wrap = tk.Frame(self.left_toolbar, bg=BG)
self.gladosify_wrap.pack(side="left", padx=(8, 8), pady=(6, 6))
self._lt_sp_right = tk.Frame(self.left_toolbar, bg=BG)
self._lt_sp_right.pack(side="left", expand=True, fill="x")
self._add_dashed_border(self.gladosify_wrap)
self.gladosify_btn = ttk.Button(self.gladosify_wrap, text="GLaDOSify", command=self.on_gladosify_click,
style="Dark.TButton", takefocus=False)
self.gladosify_btn.pack(padx=4, pady=4)
# Center image canvas (head + eye overlay)
self.center_frame = tk.Frame(self, bg=BG)
self.center_frame.grid(row=0, column=1, sticky="nsew")
self.canvas = tk.Canvas(self.center_frame, width=260, height=260, bg=BG, highlightthickness=0, bd=0)
self.canvas.pack(padx=0, pady=0, fill="both", expand=True)
self._resize_job = None
self._last_render_size = (0, 0)
self.center_frame.bind("<Configure>", lambda e: self._schedule_resize())
self.canvas.bind("<Configure>", lambda e: self._schedule_resize())
self._load_images()
# Right non-editable textbox with top-right small buttons
self.right_frame = tk.Frame(self, bg=BG)
self.right_frame.grid(row=0, column=2, sticky="nsew")
self.right_frame.rowconfigure(0, weight=1)
self.right_frame.columnconfigure(0, weight=1)
self.output_container = tk.Frame(self.right_frame, bg=BG)
self.output_container.grid(row=0, column=0, sticky="nsew")
self.output_wrap = tk.Frame(self.output_container, bg=BG)
self.output_wrap.pack(side="top", fill="both", expand=True)
self._add_dashed_border(self.output_wrap)
self.output_text = tk.Text(
self.output_wrap,
bg=BOX_BG,
fg=FG,
wrap="word",
bd=0,
highlightthickness=0,
padx=10,
pady=10,
)
self.output_text.configure(state="disabled")
self.output_text.pack(fill="both", expand=True, padx=4, pady=4)
# Bottom toolbar (below right textbox)
self.toolbar = tk.Frame(self.output_container, bg=BG)
self.toolbar.pack(side="bottom", fill="x")
# Row centered under the textbox
self.toolbar_row = tk.Frame(self.toolbar, bg=BG)
self.toolbar_row.pack(side="top", pady=6)
def make_bordered_button(parent, text, cmd, icon=False):
wrap = tk.Frame(parent, bg=BG)
wrap.pack(side="left", padx=6, pady=2)
self._add_dashed_border(wrap)
btn = ttk.Button(
wrap,
text=text,
command=cmd,
style=("DarkIcon.TButton" if icon else "Dark.TButton"),
takefocus=False,
width=0,
)
# Keep a tiny inner margin so the dashed border is visible
pad_x = 8
pad_y = 5
btn.pack(padx=pad_x, pady=pad_y)
return wrap, btn
# Buttons centered and spaced
self.copy_wrap, self.copy_btn = make_bordered_button(self.toolbar_row, "", self.copy_output, icon=True)
# Build / load orange speaker icon image (user-provided files preferred)
self._speaker_icon = self._load_speaker_icon(16)
self.speaker_wrap, self.speaker_btn = make_bordered_button(self.toolbar_row, "", self.toggle_speaker, icon=True)
if self._speaker_icon is not None:
self.speaker_btn.configure(image=self._speaker_icon, text="")
self.save_wrap, self.save_btn = make_bordered_button(self.toolbar_row, "Save Audio", self.save_as_wav)
self.save_btn.configure(state="disabled")
# Bottom bar: left button, center controls (model + URL), right button
self.bottom = tk.Frame(self, bg=BG)
self.bottom.grid(row=1, column=0, columnspan=3, sticky="ew", pady=(8, 8))
self.bottom.columnconfigure(0, weight=1)
self.bottom.columnconfigure(1, weight=0)
self.bottom.columnconfigure(2, weight=1)
# Left area in bottom bar is now empty spacer
tk.Frame(self.bottom, bg=BG).grid(row=0, column=0, sticky="w", padx=(8, 8))
center_controls = tk.Frame(self.bottom, bg=BG)
center_controls.grid(row=0, column=1)
tk.Label(center_controls, text="Model:", bg=BG, fg=FG).pack(side="left", padx=(0, 4))
self.model_var = tk.StringVar(value=self.settings.ollama_model or "Select LLM...")
self.combo_wrap = tk.Frame(center_controls, bg=BG)
self.combo_wrap.pack(side="left", padx=(0, 8))
self._add_dashed_border(self.combo_wrap)
self.model_combo = ttk.Combobox(self.combo_wrap, textvariable=self.model_var, values=[], width=20, state="readonly", style="Dark.TCombobox")
self.model_combo.pack(side="left", padx=4, pady=4)
self.combo_arrow = tk.Label(self.combo_wrap, text="", bg=BG, fg=FG)
self.combo_arrow.place(relx=1.0, rely=0.5, anchor="e", x=-6)
self.combo_arrow.bind("<Button-1>", lambda e: self.model_combo.event_generate("<Button-1>"))
self.model_combo.bind("<<ComboboxSelected>>", self._on_model_change)
tk.Label(center_controls, text="Ollama URL:", bg=BG, fg=FG).pack(side="left", padx=(0, 4))
self.url_var = tk.StringVar(value=self.settings.ollama_url)
self.url_wrap = tk.Frame(center_controls, bg=BG)
self.url_wrap.pack(side="left", padx=(0, 8))
self._add_dashed_border(self.url_wrap)
self.url_entry = tk.Entry(self.url_wrap, textvariable=self.url_var, bg=BOX_BG, fg=FG, insertbackground=FG, bd=0, width=28, highlightthickness=0)
self.url_entry.pack(side="left", padx=4, pady=4)
self.url_var.trace_add("write", lambda *_: self._save_settings())
# Piper model picker
self.voice_wrap = tk.Frame(center_controls, bg=BG)
self.voice_wrap.pack(side="left")
self._add_dashed_border(self.voice_wrap)
self.voice_btn = ttk.Button(self.voice_wrap, text="Voice model…", command=self.pick_piper_model, style="Dark.TButton")
self.voice_btn.pack(padx=4, pady=4)
# Right area bottom bar spacer
tk.Frame(self.bottom, bg=BG).grid(row=0, column=2, sticky="e", padx=(8, 8))
# Style ttk for dark bg
style = ttk.Style(self)
try:
style.theme_use("clam")
except Exception:
pass
# Dark combobox style to remove inner white border
style.configure("Dark.TCombobox", fieldbackground=BOX_BG, background=BOX_BG, foreground=FG,
bordercolor=BG, lightcolor=BG, darkcolor=BG, arrowsize=14)
style.map("Dark.TCombobox",
fieldbackground=[("readonly", BOX_BG)],
foreground=[("readonly", FG)],
background=[("readonly", BOX_BG)])
# Dark button styles (remove Aqua gradients/borders)
# Minimal layout without border/focus elements
style.layout("Dark.TButton", [
("Button.padding", {"sticky": "nswe", "children": [
("Button.label", {"sticky": "nswe"})
]})
])
style.configure("Dark.TButton", background=BG, foreground=FG, borderwidth=0, focusthickness=0, relief="flat", padding=(8,5))
style.map("Dark.TButton",
background=[("active", BG), ("pressed", BG)],
foreground=[("disabled", "#5a4a10"), ("!disabled", FG)])
# Compact icon button style (less horizontal padding)
style.layout("DarkIcon.TButton", [
("Button.padding", {"sticky": "nswe", "children": [
("Button.label", {"sticky": "nswe"})
]})
])
style.configure("DarkIcon.TButton", background=BG, foreground=FG, borderwidth=0, focusthickness=0, relief="flat", padding=(8,5))
style.map("DarkIcon.TButton",
background=[("active", BG), ("pressed", BG)],
foreground=[("disabled", "#5a4a10"), ("!disabled", FG)])
# Kick off model listing and UI polling
threading.Thread(target=self._load_models_async, daemon=True).start()
self.after(50, self._drain_ui_queue)
# Ensure images render once canvas has a real size
self.after(0, self._ensure_canvas_ready)
# Eye animation tick (50ms)
self.after(50, self._tick_eye)
# --- Images / eye overlay ---
def _load_images(self):
try:
if PIL_OK:
self._head_img_orig = Image.open(HEAD_IMAGE_PATH).convert("RGBA")
self._eye_img_orig = Image.open(EYE_IMAGE_PATH).convert("RGBA")
self._eye_levels = None # generated per-size in _resize_images
else:
# Fallback: load originals and let _resize_images() downscale via subsample()
self._head_photo_orig = tk.PhotoImage(file=str(HEAD_IMAGE_PATH))
self._eye_photo_orig = tk.PhotoImage(file=str(EYE_IMAGE_PATH))
self._eye_levels = None
except Exception as e:
messagebox.showerror("Assets missing", f"Failed to load images: {e}")
self._head_base = None
self._eye_photo = None
self._eye_levels = None
# Defer initial sizing until canvas has a size
self.after(0, self._ensure_canvas_ready)
def _ensure_canvas_ready(self):
# Wait until canvas is realized with a usable size, then size images
cw = self.canvas.winfo_width()
ch = self.canvas.winfo_height()
if cw < 20 or ch < 20:
self.after(50, self._ensure_canvas_ready)
return
self._resize_images()
def _schedule_resize(self):
# Debounce heavy resize work while dragging the window
if self._resize_job is not None:
try:
self.after_cancel(self._resize_job)
except Exception:
pass
self._resize_job = self.after(150, self._apply_resize)
def _apply_resize(self):
self._resize_job = None
self._resize_images()
def _resize_images(self):
# Fit images to current canvas while preserving aspect
cw = max(120, self.canvas.winfo_width() - 8)
ch = max(120, self.canvas.winfo_height() - 8)
if cw <= 0 or ch <= 0:
return
# Skip expensive work if size hasn't changed meaningfully
if self._last_render_size == (cw, ch):
return
self._last_render_size = (cw, ch)
if PIL_OK and hasattr(self, "_head_img_orig"):
# High-quality resize via Pillow
w0, h0 = self._head_img_orig.size
scale = min(cw / w0, ch / h0)
scale = max(0.2, min(scale, 1.5))
w, h = int(w0 * scale), int(h0 * scale)
head_resized = self._head_img_orig.resize((w, h), Image.LANCZOS)
self._head_base = ImageTk.PhotoImage(head_resized)
# Store resized eye base and lazily generate alpha frames on demand
self._eye_base_resized = self._eye_img_orig.resize((w, h), Image.LANCZOS)
self._eye_levels = [None] * 41 # 0..40 frames
# Choose current eye frame based on current smoothed level
lvl = getattr(self, "_lvl_ema", 0.0)
vis = self._map_level_to_vis(lvl)
idx = max(0, min(40, int(round(vis * 40))))
self._eye_photo = self._get_eye_frame(idx)
self._render_images()
return
# Fallback path (no Pillow): downscale using integer subsample
if hasattr(self, "_head_photo_orig"):
w0 = self._head_photo_orig.width()
h0 = self._head_photo_orig.height()
# Determine integer reduction factor so image fits within cw x ch
import math
fx = math.ceil(w0 / cw)
fy = math.ceil(h0 / ch)
f = max(1, fx, fy)
try:
self._head_base = self._head_photo_orig.subsample(f, f)
self._eye_photo = self._eye_photo_orig.subsample(f, f)
except Exception:
# If subsample not available, just use originals
self._head_base = self._head_photo_orig
self._eye_photo = self._eye_photo_orig
self._render_images()
def _render_images(self):
self.canvas.delete("all")
self.canvas.update_idletasks()
cw = max(1, self.canvas.winfo_width())
ch = max(1, self.canvas.winfo_height())
cx, cy = cw // 2, ch // 2
if hasattr(self, "_head_base") and self._head_base is not None:
self.canvas.create_image(cx, cy, image=self._head_base)
if hasattr(self, "_eye_photo") and self._eye_photo is not None:
self._eye_item = self.canvas.create_image(cx, cy, image=self._eye_photo)
def _tick_eye(self):
# Map audio level to eye opacity
# Smooth level (EMA) and slightly delay visually to feel synced with audio
target = self._get_delayed_level()
# EMA smoothing
alpha = 0.2
self._lvl_ema = (1 - alpha) * self._lvl_ema + alpha * target
lvl = self._lvl_ema
# Map level to visual brightness with noise floor, gain and gamma compression
vis = self._map_level_to_vis(lvl)
# If the eye item isn't created yet (early ticks), try again shortly
if not hasattr(self, "_eye_item"):
self.after(50, self._tick_eye)
return
if self._eye_levels is not None:
idx = max(0, min(40, int(round(vis * 40))))
new_img = self._get_eye_frame(idx)
self.canvas.itemconfigure(self._eye_item, image=new_img)
# keep reference to prevent GC
self._eye_photo = new_img
else:
# Fallback (no Pillow): toggle visibility based on level
try:
state = "hidden" if vis < 0.1 else "normal"
self.canvas.itemconfigure(self._eye_item, state=state)
except Exception:
pass
# Tick every 50ms for smoother updates
self.after(50, self._tick_eye)
def _add_dashed_border(self, widget):
# Draw a dashed rectangle around a container widget using a child canvas
c = tk.Canvas(widget, bg=BG, highlightthickness=0, bd=0)
c.place(relx=0, rely=0, relwidth=1, relheight=1)
# Keep behind, but ensure border remains visible by maintaining inner margins on children
def redraw(event=None):
c.delete("all")
w = widget.winfo_width()
h = widget.winfo_height()
if w < 4 or h < 4:
return
m = 1
try:
c.create_rectangle(m, m, w - m, h - m, outline=FG, width=1, dash=(4, 3))
except Exception:
c.create_rectangle(m, m, w - m, h - m, outline=FG, width=1)
widget.bind("<Configure>", redraw)
redraw()
def _build_speaker_icon(self, size: int = 20):
"""Draw a crisp, amber speaker icon with sound waves using supersampling."""
if not PIL_OK:
return None
try:
from PIL import ImageDraw
except Exception:
return None
# Supersample for smoother curves, then downscale
s = max(16, int(size))
k = 3
S = s * k
img = Image.new("RGBA", (S, S), (0, 0, 0, 0))
d = ImageDraw.Draw(img)
# Convert hex FG to RGBA tuple if necessary
col_hex = FG
if isinstance(col_hex, str) and col_hex.startswith("#") and len(col_hex) == 7:
r = int(col_hex[1:3], 16)
g = int(col_hex[3:5], 16)
b = int(col_hex[5:7], 16)
col = (r, g, b, 255)
else:
col = (225, 161, 1, 255)
# Geometry (scaled by k)
ymid = S // 2
stroke = max(2, S // 30) # ~2px at 60px
body_w = max(9, S // 5)
body_h = max(27, int(S * 0.6))
horn_w = max(9, S // 6)
x0 = 2 * k
y0 = ymid - body_h // 2
x1 = x0 + body_w
y1 = ymid + body_h // 2
# Body (filled)
d.rectangle([x0, y0, x1, y1], fill=col)
# Horn (filled triangle)
horn = [(x1, y0), (x1 + horn_w, ymid), (x1, y1)]
d.polygon(horn, fill=col)
# Sound waves (3 arcs), tight angles so they look like emanating waves
cx = x1 + horn_w + 3 * k
cy = ymid
radii = [int(S * 0.28), int(S * 0.42), int(S * 0.56)]
for r in radii:
bbox = [cx - r, cy - r, cx + r, cy + r]
# Right-side arc (~ -35..35 degrees => 325..35)
d.arc(bbox, start=325, end=35, fill=col, width=stroke)
# Downscale to target size for a crisp icon
img_small = img.resize((s, s), Image.LANCZOS)
return ImageTk.PhotoImage(img_small)
# --- Icon loading helpers ---
def _load_icon_from_file(self, path: pathlib.Path, size: int):
if not path.exists():
return None
if PIL_OK:
try:
im = Image.open(str(path)).convert("RGBA")
im = im.resize((size, size), Image.LANCZOS)
return ImageTk.PhotoImage(im)
except Exception:
return None
else:
try:
return tk.PhotoImage(file=str(path))
except Exception:
return None
def _load_speaker_icon(self, size: int = 20):
# Prefer local assets if provided by the user
for name in ("speaker.png", "speaker_icon.png"):
icon = self._load_icon_from_file(APP_DIR / name, size)
if icon is not None:
return icon
# Optional: render from SVG if cairosvg is available
svg_path = APP_DIR / "speaker.svg"
if svg_path.exists() and PIL_OK:
try:
import cairosvg # type: ignore
raw = svg_path.read_text(encoding="utf-8")
# Recolor to amber if the SVG uses dark fill
raw = raw.replace("#231F20", FG)
from io import BytesIO
png_bytes = cairosvg.svg2png(bytestring=raw.encode("utf-8"), output_width=size, output_height=size)
im = Image.open(BytesIO(png_bytes)).convert("RGBA")
return ImageTk.PhotoImage(im)
except Exception:
pass
# Fallback: draw programmatically
return self._build_speaker_icon(size)
def _update_toolbar_layout(self, event=None):
# Shrink Save button label when space is tight; restore when space returns
try:
avail = self.toolbar.winfo_width()
except Exception:
return
# Estimate total width; if too tight, shorten label
if avail and avail < 280:
if self.save_btn["text"] != "Save":
self.save_btn.configure(text="Save")
else:
if self.save_btn["text"] != "Save Audio":
self.save_btn.configure(text="Save Audio")
def _map_level_to_vis(self, lvl: float) -> float:
noise_floor = 0.03
gain = 4.0
gamma = 2
x = max(0.0, (lvl - noise_floor)) * gain
return max(0.0, min(1.0, x)) ** gamma
def _get_eye_frame(self, idx: int):
# Lazily build and cache the eye alpha frame for current size
if not hasattr(self, "_eye_levels") or self._eye_levels is None:
return self._eye_photo
if 0 <= idx < len(self._eye_levels) and self._eye_levels[idx] is not None:
return self._eye_levels[idx]
if not hasattr(self, "_eye_base_resized"):
return self._eye_photo
# Create frame at requested alpha level
a = idx / 40.0
r, g, b, a_chan = self._eye_base_resized.split()
a_scaled = a_chan.point(lambda px, aa=a: int(px * aa))
img = Image.merge("RGBA", (r, g, b, a_scaled))
photo = ImageTk.PhotoImage(img)
if 0 <= idx < len(self._eye_levels):
self._eye_levels[idx] = photo
return photo
# --- Bottom controls ---
def _on_model_change(self, *_):
val = self.model_var.get()
if val and val != "Select LLM...":
self.settings.ollama_model = val
self._save_settings()
def _save_settings(self):
self.settings.ollama_url = self.url_var.get().strip() or DEFAULT_OLLAMA_URL
# Model saved on change
self.settings.save()
def _load_models_async(self):
models = list_ollama_models()
models_sorted = sorted(models)
self.models = models_sorted
# Pick default
preferred = "mistral3.2:24b"
chosen = self.settings.ollama_model
if not chosen:
if preferred in models_sorted:
chosen = preferred
elif DEFAULT_OLLAMA_MODEL in models_sorted:
chosen = DEFAULT_OLLAMA_MODEL
else:
chosen = "Select LLM..."
self._ui_q.put(("models", models_sorted, chosen))
# --- Streaming ---
def on_gladosify_click(self):
# Toggle start/cancel
if self._stream_thread and self._stream_thread.is_alive():
self.cancel_stream()
else:
self.start_stream()
def start_stream(self):
if self._stream_thread and self._stream_thread.is_alive():
return
raw = self.input_text.get("1.0", "end-1c").strip()
if not raw:
messagebox.showinfo("GLaDOSify", "Please enter some text on the left.")
return
model = self.model_var.get().strip()
if not model or model == "Select LLM...":
messagebox.showinfo("GLaDOSify", "Please select an Ollama model.")
return
url = self.url_var.get().strip() or DEFAULT_OLLAMA_URL
self._stop_flag.clear()
self._streaming = True
self._stream_tts_wanted = True
self.gladosify_btn.configure(text="Cancel")
self._disable_controls(True)
self._clear_output()
self._pending_sents.clear()
self._stream_buf = ""
self._last_wav_path = None
self.save_btn.configure(state="disabled")
self._stream_thread = threading.Thread(target=self._run_stream, args=(raw, model, url), daemon=True)
self._stream_thread.start()
def cancel_stream(self):
self._stop_flag.set()
# Stop TTS if active
if self._stream_ps is not None:
try:
self._stream_ps.abort()
except Exception:
pass
self._stream_ps = None
# Abort HTTP stream if possible
try:
if self._current_response is not None:
self._current_response.close()
except Exception:
pass
def _run_stream(self, raw: str, model: str, url: str):
# Prepare Piper
try:
model_path = pathlib.Path(self.settings.piper_model).resolve()
sr = load_sample_rate(model_path)
except Exception as e:
self._ui_q.put(("error", f"Piper model error: {e}"))
self._ui_q.put(("done",))
return
def on_level(level: float):
self._push_level(level)
try:
# Prepare Piper first (may be toggled off later)
# Capture to temp WAV so user can save later
fd, tmp_path = tempfile.mkstemp(suffix=".wav")
os.close(fd)
tmp_wav = pathlib.Path(tmp_path)
ps = PiperStreamer(
model_path=model_path,
sample_rate=sr,
on_audio_level=on_level,
out_wav=tmp_wav,
)
self._stream_ps = ps
# Stream from Ollama
payload = {
"model": model,
"prompt": self._build_prompt(raw),
"stream": True,
}
with requests.post(url, json=payload, timeout=240, stream=True) as r:
self._current_response = r
r.raise_for_status()
stripper = ThinkStripper()
buf = ""
self._stream_buf = ""
for line in r.iter_lines(decode_unicode=True):
if self._stop_flag.is_set():
break
if not line:
continue
try:
obj = json.loads(line)
except Exception:
continue
if "response" in obj:
vis = stripper.feed(obj["response"])
if vis:
self._ui_q.put(("append", vis))
buf += vis
self._stream_buf = self._stream_buf + vis
sents, buf = pop_complete_sentences(buf)
if self._stream_ps is not None:
for s in sents:
self._stream_ps.say(s)
if obj.get("done"):
break
# flush tail
tail = stripper.flush()
if tail:
self._ui_q.put(("append", tail))
buf += tail
self._stream_buf = self._stream_buf + tail
if buf.strip():
sents, rest = pop_complete_sentences(buf)
if self._stream_ps is not None:
for s in sents:
self._stream_ps.say(s)
if rest.strip():
self._stream_ps.say(rest.strip())
# Done HTTP
except Exception as e:
self._ui_q.put(("error", str(e)))
finally:
# Tidy Piper and enable save
try:
if self._stream_ps is not None:
if self._stop_flag.is_set():
# User canceled: abort immediately
self._stream_ps.abort()
else:
# Normal completion: gracefully drain remaining audio
self._stream_ps.close()
self._stream_ps = None
except Exception:
pass
self._current_response = None
if 'tmp_wav' in locals() and tmp_wav.exists() and tmp_wav.stat().st_size > 44:
self._last_wav_path = tmp_wav
self._ui_q.put(("done",))
def _build_prompt(self, user_text: str) -> str:
# Use the same prompt as the CLI via format, without importing the string directly.
# Importing the constant would be okay, but keeping duplicate format avoids accidental edits.
from glados_say_stream import GLADOS_PROMPT
return GLADOS_PROMPT.format(user_text=user_text)
def _drain_ui_queue(self):
try:
while True:
item = self._ui_q.get_nowait()
if not item:
continue
tag = item[0]
if tag == "models":
models, chosen = item[1], item[2]
self.model_combo.configure(values=models)
self.model_var.set(chosen)
self.settings.ollama_model = chosen if chosen != "Select LLM..." else ""
self._save_settings()
elif tag == "append":
chunk = item[1]
self._append_output(chunk)
elif tag == "error":
messagebox.showerror("GLaDOSify", item[1])
elif tag == "done":
self._disable_controls(False)
self._last_output_full = self.output_text.get("1.0", "end-1c")
self.gladosify_btn.configure(text="GLaDOSify")
self._streaming = False
if self._last_wav_path and self._last_wav_path.exists():
self.save_btn.configure(state="normal")
except queue.Empty:
pass
self.after(50, self._drain_ui_queue)
def _append_output(self, text: str):
self.output_text.configure(state="normal")
self.output_text.insert("end", text)
self.output_text.see("end")
self.output_text.configure(state="disabled")
def _clear_output(self):
self.output_text.configure(state="normal")
self.output_text.delete("1.0", "end")
self.output_text.configure(state="disabled")
def _disable_controls(self, busy: bool):
state = "disabled" if busy else "normal"
# Keep gladosify clickable to allow Cancel
self.gladosify_btn.configure(state="normal")
self.model_combo.configure(state="disabled" if busy else "readonly")
self.url_entry.configure(state=state)
self.copy_btn.configure(state="normal")
self.speaker_btn.configure(state="normal")
# --- Actions ---
def copy_output(self):
text = self.output_text.get("1.0", "end-1c")
if not text:
return
self.clipboard_clear()
self.clipboard_append(text)
# --- Placeholder logic for left input ---
def _init_placeholder(self):
self._placeholder_text = "Type text here..."
self._placeholder_active = False
try:
self.input_text.tag_configure("placeholder", foreground="#6e6e6e")
except Exception:
pass
self._show_placeholder()
self.input_text.bind("<FocusIn>", self._on_input_focus_in)
self.input_text.bind("<FocusOut>", self._on_input_focus_out)
self.input_text.bind("<KeyPress>", self._on_input_keypress)
def _show_placeholder(self):
if self._placeholder_active:
return
if self.input_text.get("1.0", "end-1c").strip():
return
try:
self.input_text.configure(state="normal")
except Exception:
pass
self.input_text.delete("1.0", "end")
self.input_text.insert("1.0", self._placeholder_text, ("placeholder",))
self._placeholder_active = True
def _hide_placeholder(self):
if self._placeholder_active:
self.input_text.delete("1.0", "end")
self._placeholder_active = False
def _on_input_focus_in(self, _):
if self._placeholder_active:
self._hide_placeholder()
def _on_input_focus_out(self, _):
if not self.input_text.get("1.0", "end-1c").strip():
self._show_placeholder()
def _on_input_keypress(self, _):
if self._placeholder_active:
self._hide_placeholder()
def toggle_speaker(self):
# If streaming TTS active, toggle it
if self._stream_thread and self._stream_thread.is_alive():
if self._stream_ps is not None and self._stream_tts_wanted:
# turn off
self._stream_tts_wanted = False
try:
self._stream_ps.set_muted(True)
except Exception:
pass
if getattr(self, "_speaker_icon", None) is not None:
self.speaker_btn.configure(image=self._speaker_icon, text="")
else:
self.speaker_btn.configure(text="🔊", image="")
else:
# turn on: create PS and flush pending sentences first
try:
if self._stream_ps is not None:
self._stream_ps.set_muted(False)
self.speaker_btn.configure(text="", image="")
self._stream_tts_wanted = True
except Exception as e:
messagebox.showerror("TTS", f"Failed to start audio: {e}")
return
# Else: use replay toggle
if self._replay_thread and self._replay_thread.is_alive():
# stop replay
try:
self._replay_stop = True
if self._replay_ps is not None:
self._replay_ps.abort()
except Exception:
pass
if getattr(self, "_speaker_icon", None) is not None:
self.speaker_btn.configure(image=self._speaker_icon, text="")
else:
self.speaker_btn.configure(text="🔊", image="")
return
# start replay
text = self.output_text.get("1.0", "end-1c").strip()
if not text:
return
self.speaker_btn.configure(text="", image="")
self._replay_stop = False
self._replay_ps = None
self._replay_thread = threading.Thread(target=self._do_replay, args=(text,), daemon=True)
self._replay_thread.start()
def _do_replay(self, text: str):
try:
model_path = pathlib.Path(self.settings.piper_model).resolve()
sr = load_sample_rate(model_path)
ps = PiperStreamer(model_path=model_path, sample_rate=sr,
on_audio_level=self._push_level)
self._replay_ps = ps
sents, rest = pop_complete_sentences(text)
for s in sents:
if getattr(self, "_replay_stop", False):
break
ps.say(s)
if not getattr(self, "_replay_stop", False) and rest.strip():
ps.say(rest.strip())
except Exception as e:
self._ui_q.put(("error", f"Replay failed: {e}"))
finally:
try:
if self._replay_ps is not None:
if getattr(self, "_replay_stop", False):
self._replay_ps.abort()
else:
self._replay_ps.close()
except Exception:
pass
self._replay_ps = None
self.speaker_btn.configure(text="🔊")
def save_as_wav(self):
if not self._last_wav_path or not self._last_wav_path.exists():
messagebox.showinfo("Save Audio", "No audio available yet.")
return
dest = filedialog.asksaveasfilename(
title="Save Audio",
defaultextension=".wav",
filetypes=[("WAV audio", ".wav")],
initialfile="glados_output.wav",
)
if not dest:
return
try:
pathlib.Path(dest).write_bytes(self._last_wav_path.read_bytes())
messagebox.showinfo("Save Audio", f"Saved to {dest}")
except Exception as e:
messagebox.showerror("Save Audio", f"Failed to save: {e}")
def pick_piper_model(self):
path = filedialog.askopenfilename(
title="Select Piper ONNX model",
filetypes=[("Piper ONNX", ".onnx"), ("All files", "*.*")],
initialdir=str(APP_DIR),
)
if not path:
return
self.settings.piper_model = path
self.settings.save()
# --- Level helpers ---
def _push_level(self, v: float):
# store timestamped level for delayed display
if not hasattr(self, "_lvl_hist"):
self._lvl_hist = deque(maxlen=256)
self._lvl_hist.append((time.time(), float(v)))
self._audio_level = float(v)
def _get_delayed_level(self, delay: float = 0.15) -> float:
"""Return a smoothed, delayed audio level with natural fade-out.
- Uses a small visual delay to better match perceived audio.
- If there are no recent samples, attenuates level toward 0 over time.
"""
now = time.time()
if not hasattr(self, "_lvl_hist") or not self._lvl_hist:
return 0.0
# Most recent sample
last_t, last_v = self._lvl_hist[-1]
# Preferred: sample at or before (now - delay) for slight sync lag
t_target = now - delay
level = None
for t, v in reversed(self._lvl_hist):
if t <= t_target:
level = v
break
if level is None:
# If we don't have a sample that old yet, use the latest
level = last_v
# Fade out if no updates for a while (age beyond delay)
age = now - last_t
fade_start = delay # start fading after this delay
fade_window = 0.6 # fade to zero over this many seconds
if age > fade_start:
k = max(0.0, 1.0 - (age - fade_start) / fade_window)
level *= max(0.0, min(1.0, k))
return float(max(0.0, min(1.0, level)))
def main():
app = GladosGUI()
app.minsize(760, 420)
app.mainloop()
if __name__ == "__main__":
sys.exit(main())