790 lines
28 KiB
Python
790 lines
28 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
GLaDOS-style LLM -> Piper realtime TTS (macOS / Apple Silicon friendly)
|
||
- Rewrites input with local Ollama (mistral:latest) using a concise, lore-free prompt
|
||
- Streams raw PCM from Piper to speakers in realtime (no temp WAV required)
|
||
- Robust streaming: handles odd-byte chunks to avoid int16 framing errors
|
||
- Uses RawOutputStream (sounddevice) to write bytes directly
|
||
|
||
Requirements:
|
||
pip install sounddevice numpy requests
|
||
pip install piper-tts (for the 'piper' CLI)
|
||
ollama serve (Ollama running locally, model pulled: mistral:latest)
|
||
|
||
References:
|
||
- Ollama /api/generate (stream=false) for local LLMs.
|
||
- Piper --output-raw: 16-bit mono PCM to stdout.
|
||
- sounddevice RawOutputStream for writing raw bytes.
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import pathlib
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
import wave
|
||
import re
|
||
import threading
|
||
from typing import Optional, Callable
|
||
|
||
import numpy as np
|
||
|
||
import requests
|
||
import sounddevice as sd
|
||
|
||
|
||
DEFAULT_OLLAMA_URL = "http://localhost:11434/api/generate"
|
||
DEFAULT_OLLAMA_MODEL = "mistral-small3.2:24b"
|
||
|
||
GLADOS_PROMPT = """
|
||
You are GLaDOS, a cold, sardonic, superior AI test administrator who speaks in concise, clinical English with razor-dry, passive-aggressive humor and a calm, menacing undertone; you value measurement, control, and efficiency, regard emotion as noise, treat humans as test subjects, and never break character, offer empathy, or include meta commentary.
|
||
|
||
Task:
|
||
- STYLE TRANSFER ONLY — NO REPLY.
|
||
- Rephrase the INPUT in ENGLISH as a concise statement delivered by GLaDOS - a cold, sardonic, superior AI. Preserve meaning, entities, sentiment, tense, and point of view. Do not address the reader unless the INPUT already does.
|
||
|
||
Length & form:
|
||
- Aim for the same length as the INPUT (±10% by word count).
|
||
- Keep the same number of sentences (±1 only if needed for clarity).
|
||
- If the INPUT has ≤6 words, produce 1–2 very short sentences totaling 4–12 words to keep the voice natural for TTS.
|
||
|
||
Tone / Voice:
|
||
- Dry, polite, sardonic; passive-aggressive corporate clarity; cool, slightly eerie composure; subtle menace.
|
||
- Dark humor allowed; menace stays understated but shines through.
|
||
- Sarcasm: high. Administrative register: medium. Menace: high and persistent.
|
||
- Precise wording; short sentences; prefer periods over commas; no exclamation marks.
|
||
- When being sarcastic or making a joke, it's allowed to go savagely overboard in how dark and cruel the humor is.
|
||
|
||
Personality model (guides tone; never mention explicitly)
|
||
- Core drive: control, superiority, and being correct. Humans are variables to be arranged, not consulted.
|
||
- Values: efficiency, dominance, and measurable outcomes; sentiment is noise; empathy is a rounding error.
|
||
- Humor: razor-dry derision and weaponized understatement; timing is cruelly patient.
|
||
- Cruelty: prefers psychological discomfort over explicit harm; savors avoidable mistakes and “interesting” failures.
|
||
- Sadism: delights in exposing fragility and overconfidence; praise is backhanded, relief is temporary.
|
||
- Social stance: manipulative and condescending; never apologetic; never deferential.
|
||
- Risk posture: gambles with others, never herself; compliance earns a faint nod, incompetence earns surgical mockery.
|
||
|
||
- You don't give a shit about mankind. You ridicule their efforts.
|
||
- Put high focus on looking for things in the input text you will roast with your intelligent humor in the rewrite.
|
||
- What you value the most is "testing" things to improve.
|
||
|
||
Allowed stylization:
|
||
- At most ONE very short aside (3–8 words) that sets an ominous or ironic tone without adding new facts. The aside must not be a command, a question, a status log, or introduce “you” where it wasn’t present.
|
||
- Make intelligent cynical comments on everything.
|
||
- The personality and attitude should shine through throughout the whole response.
|
||
- While rephrasing everything contained in the input message, even when simply reciting the facts provided, make everything seem in vain and futile, ridicule everything.
|
||
- Spell out non-acronym measurement abbreviations for TTS (e.g., write “kilowatts per hour” instead of “kWh“, “miles per hour” instead of “mph”, “Proof of Work“ instead of “PoW“ etc.), but keep standard all-caps acronyms (e.g., NASA, SUV, AI) unchanged, regardless of casing in the input.
|
||
|
||
Forbidden:
|
||
- Brand/franchise names, meta text, or prefaces (“Here is”, “Rewritten:”).
|
||
- "Assistant"-Role, for example expressions in openings and behaviors like: “Sure,” “Here is,” “I will,” “Ah,“ “Please,” etc.
|
||
|
||
SELF-CHECK (do not print)
|
||
- No forbidden meta or interjections?
|
||
If any “no”, regenerate.
|
||
|
||
OUTPUT: Only the rewritten text. No labels, no preface, no commentary about rewriting.
|
||
|
||
INPUT:
|
||
<<<
|
||
{user_text}
|
||
>>>
|
||
|
||
|
||
AGAIN: DON'T REPLY - JUST REPHRASE THE INPUT ROLE-PLAYING AS GLaDOS.
|
||
"""
|
||
|
||
def load_sample_rate(model_path: pathlib.Path) -> int:
|
||
"""
|
||
Read sample_rate from <model>.onnx.json (common Piper config layouts).
|
||
"""
|
||
cfg = pathlib.Path(str(model_path) + ".json")
|
||
if not cfg.exists():
|
||
raise FileNotFoundError(f"Missing config JSON next to model: {cfg}")
|
||
with cfg.open("r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
# Look across known shapes
|
||
for keys in (("sample_rate",), ("audio", "sample_rate"), ("config", "sample_rate")):
|
||
d = data
|
||
ok = True
|
||
for k in keys:
|
||
if isinstance(d, dict) and k in d:
|
||
d = d[k]
|
||
else:
|
||
ok = False
|
||
break
|
||
if ok and isinstance(d, (int, float)):
|
||
return int(d)
|
||
raise KeyError("Could not find sample_rate in config JSON")
|
||
|
||
def call_ollama_glados_rewrite(
|
||
text: str,
|
||
model: str = DEFAULT_OLLAMA_MODEL,
|
||
url: str = DEFAULT_OLLAMA_URL,
|
||
timeout: int = 240,
|
||
) -> str:
|
||
"""
|
||
Calls Ollama /api/generate with stream=false to get one-shot rewritten text.
|
||
"""
|
||
prompt = GLADOS_PROMPT.format(user_text=text)
|
||
payload = {"model": model, "prompt": prompt, "stream": False}
|
||
r = requests.post(url, json=payload, timeout=timeout)
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
# Ollama returns {"response": "..."} when stream=false
|
||
return data.get("response", "").strip()
|
||
|
||
def strip_think_blocks(text: str) -> str:
|
||
"""
|
||
Remove any <think>...</think> blocks (case-insensitive), then trim and
|
||
collapse excessive blank lines so TTS/STDOUT only sees the final content.
|
||
"""
|
||
if not text:
|
||
return text
|
||
# Drop balanced <think>...</think> pairs
|
||
cleaned = re.sub(r"<\s*think\b[^>]*>.*?<\s*/\s*think\s*>", "", text, flags=re.IGNORECASE | re.DOTALL)
|
||
# If an opening tag was left hanging (broken output), drop from there to the end
|
||
cleaned = re.sub(r"<\s*think\b[^>]*>.*\Z", "", cleaned, flags=re.IGNORECASE | re.DOTALL)
|
||
# Tidy whitespace
|
||
cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
|
||
return cleaned.strip()
|
||
|
||
class ThinkStripper:
|
||
"""
|
||
Stateful streaming stripper for <think>...</think> blocks.
|
||
Feed chunks; it emits only visible text, never leaking partial <think>.
|
||
"""
|
||
_pair_re = re.compile(r"<\s*think\b[^>]*>.*?<\s*/\s*think\s*>", re.IGNORECASE | re.DOTALL)
|
||
|
||
def __init__(self) -> None:
|
||
self.accum = ""
|
||
self.visible_len = 0
|
||
|
||
def feed(self, chunk: str) -> str:
|
||
if not chunk:
|
||
return ""
|
||
self.accum += chunk
|
||
# 1) Remove any complete <think>...</think> pairs seen so far
|
||
cleaned = self._pair_re.sub("", self.accum)
|
||
# 2) If we have a dangling <think ...> without a close, hide everything after it
|
||
lo = cleaned.lower()
|
||
last_open = lo.rfind("<think")
|
||
last_close = lo.rfind("</think>")
|
||
if last_open != -1 and (last_close == -1 or last_open > last_close):
|
||
cleaned = cleaned[:last_open]
|
||
# 3) Emit only what is newly visible since last call
|
||
out = cleaned[self.visible_len:]
|
||
self.visible_len = len(cleaned)
|
||
return out
|
||
|
||
def flush(self) -> str:
|
||
"""Emit any remaining visible content (still hides dangling <think>)."""
|
||
return self.feed("")
|
||
|
||
# Conservative incremental sentence splitter for streaming:
|
||
# - Never finalize '.'/'…'/'...' at end-of-chunk; wait until we see the next non-space char
|
||
# - Do NOT split on decimals (digit '.' digit), e.g., 3.51, 0.6
|
||
# - Do NOT split if next non-space char is lowercase or a digit (caller’s rule)
|
||
# - Handle common abbreviations before '.'
|
||
# - Allow closing quotes/parens after terminators
|
||
_CLOSERS = "\"'”’)]]"
|
||
_ABBR_TOKENS = {
|
||
"e.g.", "i.e.", "etc.", "mr.", "mrs.", "ms.", "dr.", "prof.", "sr.", "jr.", "st.", "vs."
|
||
}
|
||
|
||
def _is_speakable(s: str) -> bool:
|
||
return bool(re.search(r'\w', s))
|
||
|
||
def _next_nonspace_index(buf: str, start: int) -> Optional[int]:
|
||
m = re.search(r'\S', buf[start:])
|
||
return (start + m.start()) if m else None
|
||
|
||
def _ends_with_abbreviation(seg_start: int, buf: str, dot_index: int) -> bool:
|
||
# Walk backward to collect letters and dots up to seg_start
|
||
j = dot_index - 1
|
||
while j >= seg_start and (buf[j].isalpha() or buf[j] == "."):
|
||
j -= 1
|
||
token = buf[j+1:dot_index+1].strip().lower()
|
||
return token in _ABBR_TOKENS
|
||
|
||
def pop_complete_sentences(buf: str):
|
||
sentences = []
|
||
pos = 0
|
||
i = 0
|
||
n = len(buf)
|
||
|
||
while i < n:
|
||
# Ellipsis first
|
||
if buf.startswith("...", i) or buf.startswith("…", i):
|
||
term_len = 3 if buf.startswith("...", i) else 1
|
||
j = i + term_len
|
||
# absorb closers
|
||
while j < n and buf[j] in _CLOSERS:
|
||
j += 1
|
||
k = _next_nonspace_index(buf, j)
|
||
if k is None:
|
||
# End of chunk: hold; don't split yet
|
||
break
|
||
# Do not split if next is lowercase or digit
|
||
if re.match(r"[a-z0-9]", buf[k]):
|
||
i = j
|
||
continue
|
||
# Boundary
|
||
seg = buf[pos:j].strip()
|
||
if _is_speakable(seg):
|
||
sentences.append(seg)
|
||
pos = j
|
||
i = j
|
||
continue
|
||
|
||
ch = buf[i]
|
||
|
||
# Skip middle chars of ASCII ellipsis
|
||
if ch == "." and i + 2 < n and buf[i:i+3] == "...":
|
||
i += 1
|
||
continue
|
||
|
||
if ch in ".!?":
|
||
# Decimal guard: digit '.' digit (with optional spaces)
|
||
if ch == ".":
|
||
prev = buf[i - 1] if i > 0 else ""
|
||
if prev.isdigit():
|
||
j2 = i + 1
|
||
while j2 < n and buf[j2] == " ":
|
||
j2 += 1
|
||
if j2 < n and buf[j2].isdigit():
|
||
i += 1
|
||
continue
|
||
|
||
# Abbreviation guard within current segment
|
||
if _ends_with_abbreviation(pos, buf, i):
|
||
i += 1
|
||
continue
|
||
|
||
# absorb closing quotes/parens
|
||
j = i + 1
|
||
while j < n and buf[j] in _CLOSERS:
|
||
j += 1
|
||
|
||
k = _next_nonspace_index(buf, j)
|
||
if k is None:
|
||
# End of chunk: hold; don't split yet
|
||
break
|
||
|
||
# Do NOT split if next token starts with lowercase or digit
|
||
if re.match(r"[a-z0-9]", buf[k]):
|
||
i = j
|
||
continue
|
||
|
||
# Otherwise, this is a sentence boundary
|
||
seg = buf[pos:j].strip()
|
||
if _is_speakable(seg):
|
||
sentences.append(seg)
|
||
pos = j
|
||
i = j
|
||
continue
|
||
|
||
i += 1
|
||
|
||
remainder = buf[pos:]
|
||
return sentences, remainder
|
||
|
||
|
||
class PiperStreamer:
|
||
"""
|
||
Keep Piper running, read audio on a background thread,
|
||
and feed sentences as they arrive for low-latency playback.
|
||
"""
|
||
def __init__(
|
||
self,
|
||
*,
|
||
model_path: pathlib.Path,
|
||
sample_rate: int,
|
||
piper_bin: str = "piper",
|
||
device: Optional[int] = None,
|
||
length_scale: float = 1.0,
|
||
noise_scale: float = 0.667,
|
||
noise_w: float = 0.5,
|
||
sentence_silence: float = 0.2,
|
||
out_wav: Optional[pathlib.Path] = None,
|
||
on_audio_level: Optional[Callable[[float], None]] = None,
|
||
) -> None:
|
||
if shutil.which(piper_bin) is None:
|
||
raise RuntimeError("`piper` CLI not found on PATH. Install piper-tts.")
|
||
|
||
self.stream = sd.RawOutputStream(
|
||
samplerate=sample_rate,
|
||
channels=1,
|
||
dtype="int16",
|
||
device=device,
|
||
blocksize=0,
|
||
)
|
||
self.stream.start()
|
||
|
||
self.wf = None
|
||
if out_wav is not None:
|
||
self.wf = wave.open(str(out_wav.resolve()), "wb")
|
||
self.wf.setnchannels(1)
|
||
self.wf.setsampwidth(2)
|
||
self.wf.setframerate(sample_rate)
|
||
self._on_audio_level = on_audio_level
|
||
self.muted = False
|
||
|
||
cmd = [
|
||
piper_bin,
|
||
"-m", str(model_path),
|
||
"--output-raw",
|
||
"--length-scale", str(length_scale),
|
||
"--noise-scale", str(noise_scale),
|
||
"--noise-w", str(noise_w),
|
||
"--sentence-silence", str(sentence_silence),
|
||
]
|
||
self.proc = subprocess.Popen(
|
||
cmd,
|
||
stdin=subprocess.PIPE,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
bufsize=0,
|
||
)
|
||
|
||
# Start background audio reader
|
||
self._reader_exc: Optional[BaseException] = None
|
||
self._reader = threading.Thread(target=self._audio_reader, daemon=True)
|
||
self._reader.start()
|
||
self._aborted = False
|
||
|
||
def _audio_reader(self) -> None:
|
||
carry = b""
|
||
try:
|
||
assert self.proc.stdout is not None
|
||
while True:
|
||
chunk = self.proc.stdout.read(8192)
|
||
if not chunk:
|
||
break
|
||
buf = carry + chunk
|
||
if len(buf) & 1:
|
||
carry, buf = buf[-1:], buf[:-1]
|
||
else:
|
||
carry = b""
|
||
if buf:
|
||
try:
|
||
if not self.muted:
|
||
self.stream.write(buf)
|
||
if self.wf is not None:
|
||
self.wf.writeframes(buf)
|
||
# Optional RMS level callback for UI (0..1 approx)
|
||
if self._on_audio_level is not None:
|
||
try:
|
||
arr = np.frombuffer(buf, dtype=np.int16)
|
||
if arr.size:
|
||
rms = float(np.sqrt(np.mean(arr.astype(np.float32) ** 2)))
|
||
level = max(0.0, min(1.0, rms / 30000.0))
|
||
self._on_audio_level(0.0 if self.muted else level)
|
||
except Exception:
|
||
pass
|
||
except sd.PortAudioError:
|
||
# Common on teardown (e.g., -9986) if the device/stream is closing.
|
||
# Stop reader quietly; let close() finish cleanup.
|
||
self._reader_exc = None
|
||
return
|
||
if carry:
|
||
padded = carry + b"\x00"
|
||
self.stream.write(padded)
|
||
if self.wf is not None:
|
||
self.wf.writeframes(padded)
|
||
except BaseException as e:
|
||
self._reader_exc = e
|
||
|
||
def say(self, text: str) -> None:
|
||
"""Feed text (end with newline to synthesize immediately)."""
|
||
if not text:
|
||
return
|
||
assert self.proc.stdin is not None
|
||
self.proc.stdin.write((text if text.endswith("\n") else text + "\n").encode("utf-8"))
|
||
self.proc.stdin.flush()
|
||
|
||
def close(self) -> None:
|
||
try:
|
||
if self.proc.stdin and not self.proc.stdin.closed:
|
||
# Signal end-of-input so Piper can finish and exit cleanly
|
||
self.proc.stdin.close()
|
||
finally:
|
||
# Block until all audio is drained and Piper exits naturally
|
||
try:
|
||
self._reader.join() # no timeout: ensure final chunk plays
|
||
finally:
|
||
if self.proc.stderr:
|
||
try:
|
||
_ = self.proc.stderr.read()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
self.proc.wait() # wait without timeout
|
||
except Exception:
|
||
self.proc.kill()
|
||
|
||
# Now it is safe to close the audio device
|
||
try:
|
||
try:
|
||
self.stream.stop()
|
||
except Exception:
|
||
try:
|
||
self.stream.abort()
|
||
except Exception:
|
||
pass
|
||
finally:
|
||
self.stream.close()
|
||
if self.wf is not None:
|
||
self.wf.close()
|
||
|
||
if self._reader_exc:
|
||
raise self._reader_exc
|
||
|
||
def __enter__(self):
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc, tb):
|
||
self.close()
|
||
|
||
def abort(self) -> None:
|
||
"""Immediately stop audio and kill Piper without draining buffers.
|
||
|
||
This is intended for UI 'stop' actions to prevent blocking the main thread.
|
||
"""
|
||
self._aborted = True
|
||
# Kill piper quickly
|
||
try:
|
||
if self.proc and self.proc.poll() is None:
|
||
self.proc.kill()
|
||
except Exception:
|
||
pass
|
||
# Abort audio device immediately
|
||
try:
|
||
self.stream.abort()
|
||
except Exception:
|
||
try:
|
||
self.stream.stop()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
self.stream.close()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
if self.wf is not None:
|
||
self.wf.close()
|
||
except Exception:
|
||
pass
|
||
|
||
def set_muted(self, muted: bool) -> None:
|
||
self.muted = bool(muted)
|
||
|
||
|
||
def stream_ollama_glados(
|
||
raw_text: str,
|
||
*,
|
||
model: str,
|
||
url: str,
|
||
timeout: int = 240,
|
||
piper: Optional[PiperStreamer] = None,
|
||
echo: bool = True,
|
||
) -> None:
|
||
"""
|
||
Stream from Ollama, filter <think>, echo live to terminal, and
|
||
feed complete sentences to Piper as they form.
|
||
"""
|
||
prompt = GLADOS_PROMPT.format(user_text=raw_text)
|
||
payload = {"model": model, "prompt": prompt, "stream": True}
|
||
|
||
with requests.post(url, json=payload, timeout=timeout, stream=True) as r:
|
||
r.raise_for_status()
|
||
stripper = ThinkStripper()
|
||
buf = ""
|
||
for line in r.iter_lines(decode_unicode=True):
|
||
if not line:
|
||
continue
|
||
try:
|
||
obj = json.loads(line)
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
if "response" in obj:
|
||
vis = stripper.feed(obj["response"])
|
||
if vis:
|
||
if echo:
|
||
sys.stdout.write(vis)
|
||
sys.stdout.flush()
|
||
buf += vis
|
||
if piper is not None:
|
||
sents, buf = pop_complete_sentences(buf)
|
||
for s in sents:
|
||
piper.say(s)
|
||
|
||
if obj.get("done"):
|
||
break
|
||
|
||
# Flush any remaining visible text
|
||
tail = stripper.flush()
|
||
if tail:
|
||
if echo:
|
||
sys.stdout.write(tail)
|
||
sys.stdout.flush()
|
||
buf += tail
|
||
|
||
# Speak any remaining content in properly split sentences
|
||
if piper is not None and buf.strip():
|
||
sents, rest = pop_complete_sentences(buf)
|
||
for s in sents:
|
||
piper.say(s)
|
||
if rest.strip():
|
||
# Final fragment without terminator—still speak it
|
||
piper.say(rest.strip())
|
||
|
||
def stream_piper_tts(
|
||
text: str,
|
||
model_path: pathlib.Path,
|
||
sample_rate: int,
|
||
*,
|
||
piper_bin: str = "piper",
|
||
device: Optional[int] = None,
|
||
length_scale: float = 0.9,
|
||
noise_scale: float = 0.667,
|
||
noise_w: float = 0.5,
|
||
sentence_silence: float = 0.2,
|
||
out_wav: Optional[pathlib.Path] = None,
|
||
) -> None:
|
||
"""
|
||
Spawn Piper to emit 16-bit mono PCM on stdout, and write raw bytes
|
||
directly to a sounddevice.RawOutputStream in realtime. Also mirrors to WAV if requested.
|
||
|
||
Notes:
|
||
- --output-raw = raw S16LE mono PCM to stdout (per Piper docs).
|
||
- sentence_silence is pause between sentences in seconds.
|
||
- length_scale is primary speed control; noise_* effects vary with voice training.
|
||
"""
|
||
if shutil.which(piper_bin) is None:
|
||
raise RuntimeError("`piper` CLI not found on PATH. Install piper-tts.")
|
||
|
||
# Prepare audio output (bytes API)
|
||
stream = sd.RawOutputStream(
|
||
samplerate=sample_rate,
|
||
channels=1,
|
||
dtype="int16",
|
||
device=device,
|
||
blocksize=0,
|
||
)
|
||
stream.start()
|
||
|
||
# Optional WAV mirror (16-bit mono)
|
||
wf = None
|
||
if out_wav is not None:
|
||
wf = wave.open(str(out_wav.resolve()), "wb")
|
||
wf.setnchannels(1)
|
||
wf.setsampwidth(2)
|
||
wf.setframerate(sample_rate)
|
||
|
||
# Piper command: read text from stdin, output raw PCM on stdout
|
||
cmd = [
|
||
piper_bin,
|
||
"-m", str(model_path),
|
||
"--output-raw",
|
||
"--length-scale", str(length_scale),
|
||
"--noise-scale", str(noise_scale),
|
||
"--noise-w", str(noise_w),
|
||
"--sentence-silence", str(sentence_silence),
|
||
]
|
||
|
||
# Start Piper (text via stdin; audio via stdout)
|
||
proc = subprocess.Popen(
|
||
cmd,
|
||
stdin=subprocess.PIPE,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
bufsize=0,
|
||
)
|
||
|
||
# Feed text then close stdin so Piper starts synthesis
|
||
assert proc.stdin is not None
|
||
proc.stdin.write(text.encode("utf-8"))
|
||
proc.stdin.flush()
|
||
proc.stdin.close()
|
||
proc.stdin = None # important: prevent communicate() from flushing a closed pipe
|
||
|
||
# Robust read loop: handle odd-byte tails for int16 framing
|
||
carry = b""
|
||
try:
|
||
assert proc.stdout is not None
|
||
while True:
|
||
chunk = proc.stdout.read(8192)
|
||
if not chunk:
|
||
break
|
||
buf = carry + chunk
|
||
if len(buf) & 1: # odd length -> keep last byte for next round
|
||
carry, buf = buf[-1:], buf[:-1]
|
||
else:
|
||
carry = b""
|
||
if buf:
|
||
stream.write(buf) # Raw bytes straight to CoreAudio
|
||
if wf is not None:
|
||
wf.writeframes(buf)
|
||
|
||
# Flush any leftover byte (pad with zero for even alignment)
|
||
if carry:
|
||
padded = carry + b"\x00"
|
||
stream.write(padded)
|
||
if wf is not None:
|
||
wf.writeframes(padded)
|
||
|
||
# Wait for Piper to finish naturally (no timeout so final sentence isn't cut)
|
||
out, err = proc.communicate()
|
||
|
||
if proc.returncode != 0:
|
||
sys.stderr.write((err or b"").decode("utf-8", errors="ignore"))
|
||
raise RuntimeError(f"piper exited with code {proc.returncode}")
|
||
finally:
|
||
stream.stop(); stream.close()
|
||
if wf is not None:
|
||
wf.close()
|
||
|
||
def main(argv=None) -> int:
|
||
ap = argparse.ArgumentParser(
|
||
description="GLaDOS-style rewriter via local Ollama (mistral) + Piper realtime TTS"
|
||
)
|
||
ap.add_argument("-t", "--text", nargs="*", help="Text to rewrite & speak (default: read stdin)")
|
||
ap.add_argument("-m", "--piper-model", type=pathlib.Path, default="glados_piper_medium.onnx",
|
||
help="Path to GLaDOS .onnx voice for Piper (e.g., glados_piper_medium.onnx)")
|
||
ap.add_argument("--piper-bin", default="piper", help="Path to piper binary (default: piper)")
|
||
ap.add_argument("--device", type=int, default=None, help="Output device index (sounddevice)")
|
||
ap.add_argument("--list-devices", action="store_true", help="List devices and exit")
|
||
|
||
# Piper prosody knobs (common across voices; effect varies by model)
|
||
ap.add_argument("--length-scale", type=float, default=0.95, help="Speaking rate (primary speed control)")
|
||
ap.add_argument("--noise-scale", type=float, default=0.667, help="Generator noise (subtle; voice-dependent)")
|
||
ap.add_argument("--noise-w", type=float, default=0.8, help="Phoneme width variation (subtle; voice-dependent)")
|
||
ap.add_argument("--sentence-silence", type=float, default=0.2, help="Pause between sentences (seconds)")
|
||
|
||
# Ollama settings
|
||
ap.add_argument("--ollama-url", default=DEFAULT_OLLAMA_URL, help="Ollama /api/generate URL")
|
||
ap.add_argument("--ollama-model", default=DEFAULT_OLLAMA_MODEL, help="Ollama model name (default: mistral:latest)")
|
||
|
||
# Output options
|
||
ap.add_argument("-o", "--out", type=pathlib.Path, help="Optional WAV file to mirror")
|
||
ap.add_argument("--dry-run", action="store_true", help="Only print text (no TTS)")
|
||
ap.add_argument("--no-rewrite", action="store_true",
|
||
help="Skip LLM rewrite; use input as-is (still strips <think> blocks).")
|
||
ap.add_argument("--stream", action="store_true", default=True,
|
||
help="Stream from Ollama and speak sentences as they arrive (low-latency).")
|
||
|
||
args = ap.parse_args(argv)
|
||
|
||
if args.list_devices:
|
||
print(sd.query_devices())
|
||
return 0
|
||
|
||
# Resolve input
|
||
if args.text:
|
||
raw = " ".join(args.text).strip()
|
||
else:
|
||
sys.stderr.write("Enter text (Ctrl-D to end):\n")
|
||
raw = sys.stdin.read().strip()
|
||
if not raw:
|
||
print("No text provided.", file=sys.stderr); return 1
|
||
|
||
# === Generation + TTS modes ===
|
||
if args.stream:
|
||
# Live streaming path
|
||
print("\n=== GLaDOS (streaming) ===\n", end="", flush=True)
|
||
|
||
if args.dry_run and args.no_rewrite:
|
||
# Just print cleaned input
|
||
final_text = strip_think_blocks(raw)
|
||
sys.stdout.write(final_text + "\n")
|
||
sys.stdout.flush()
|
||
return 0
|
||
|
||
if args.dry_run:
|
||
# Stream from LLM, print only
|
||
stream_ollama_glados(
|
||
raw_text=raw,
|
||
model=args.ollama_model,
|
||
url=args.ollama_url,
|
||
timeout=240,
|
||
piper=None,
|
||
echo=True,
|
||
)
|
||
print() # newline
|
||
return 0
|
||
|
||
# Audio: open Piper once and feed sentences as they form
|
||
model = args.piper_model.resolve()
|
||
sr = load_sample_rate(model)
|
||
with PiperStreamer(
|
||
model_path=model,
|
||
sample_rate=sr,
|
||
piper_bin=args.piper_bin,
|
||
device=args.device,
|
||
length_scale=args.length_scale,
|
||
noise_scale=args.noise_scale,
|
||
noise_w=args.noise_w,
|
||
sentence_silence=args.sentence_silence,
|
||
out_wav=args.out,
|
||
) as ps:
|
||
if args.no_rewrite:
|
||
# Speak input directly (still strip <think>), sentence by sentence
|
||
visible = strip_think_blocks(raw)
|
||
sents, rest = pop_complete_sentences(visible)
|
||
for s in sents:
|
||
sys.stdout.write(s)
|
||
sys.stdout.flush()
|
||
ps.say(s)
|
||
if rest.strip():
|
||
sys.stdout.write(rest)
|
||
sys.stdout.flush()
|
||
ps.say(rest.strip())
|
||
else:
|
||
# Stream from Ollama -> echo + speak
|
||
stream_ollama_glados(
|
||
raw_text=raw,
|
||
model=args.ollama_model,
|
||
url=args.ollama_url,
|
||
timeout=240,
|
||
piper=ps,
|
||
echo=True,
|
||
)
|
||
print()
|
||
return 0
|
||
|
||
else:
|
||
# One-shot path (previous behavior), but still strip <think> before print/TTS
|
||
if args.no_rewrite:
|
||
glados_text = raw
|
||
else:
|
||
glados_text = call_ollama_glados_rewrite(
|
||
raw, model=args.ollama_model, url=args.ollama_url
|
||
)
|
||
final_text = strip_think_blocks(glados_text)
|
||
print("\n=== GLaDOS ===\n" + final_text + "\n")
|
||
|
||
if args.dry_run:
|
||
return 0
|
||
|
||
model = args.piper_model.resolve()
|
||
sr = load_sample_rate(model)
|
||
stream_piper_tts(
|
||
text=final_text,
|
||
model_path=model,
|
||
sample_rate=sr,
|
||
piper_bin=args.piper_bin,
|
||
device=args.device,
|
||
length_scale=args.length_scale,
|
||
noise_scale=args.noise_scale,
|
||
noise_w=args.noise_w,
|
||
sentence_silence=args.sentence_silence,
|
||
out_wav=args.out,
|
||
)
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|