#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Headless backend actions for the Concept Maker app. This module exposes JSON actions for the Tauri UI without desktop toolkit imports. """ from __future__ import annotations import hashlib import html import json import math import os import re import shutil import subprocess import sys import tempfile import time import traceback from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple import websearch # ----------------------------- # Paths # ----------------------------- REPO_ROOT = Path(getattr(sys, "_MEIPASS", Path(__file__).resolve().parent)) IDEA_HOLE_DIR = Path(os.environ.get("CONCEPT_MAKER_DATA_DIR", REPO_ROOT / ".idea-hole")).expanduser() SYSTEM_BIN_DIRS = ["/opt/homebrew/bin", "/usr/local/bin", "/opt/local/bin", "/usr/bin", "/bin"] def resolve_command(name: str) -> Optional[str]: for base in [None, *SYSTEM_BIN_DIRS]: p = shutil.which(name) if base is None else os.path.join(base, name) if p and os.path.exists(p): return p return None def subprocess_env() -> Dict[str, str]: env = os.environ.copy() current = env.get("PATH", "") extra = [path for path in SYSTEM_BIN_DIRS if path and path not in current.split(os.pathsep)] if extra: env["PATH"] = os.pathsep.join([*extra, current] if current else extra) return env # ----------------------------- # Utilities # ----------------------------- def human_size(n: int) -> str: if n <= 0: return "0 B" units = ["B", "KB", "MB", "GB", "TB"] k = 1024.0 i = int(math.floor(math.log(n, k))) i = max(0, min(i, len(units) - 1)) return f"{n / (k**i):.1f} {units[i]}" def safe_symlink(src: Path, dst: Path) -> bool: try: if dst.exists() or dst.is_symlink(): dst.unlink() os.symlink(src, dst) return True except Exception: return False def copy_or_link(src: Path, dst_dir: Path) -> Path: dst_dir.mkdir(parents=True, exist_ok=True) dst = dst_dir / src.name if safe_symlink(src, dst): return dst shutil.copy2(src, dst) return dst def read_text_guess(path: Path) -> str: try: b = path.read_bytes() for enc in ("utf-8", "utf-16", "latin-1"): try: return b.decode(enc) except Exception: pass return b.decode("utf-8", errors="ignore") except Exception: return "" # ----------------------------- # Corpus building # ----------------------------- @dataclass class Record: id: str title: str text: str source_path: Optional[str] = None mime: Optional[str] = None class SimpleCorpusBuilder: """Very lightweight fallback if corpus_builder.py or deps are unavailable.""" def __init__(self) -> None: self._fitz = None try: import fitz # type: ignore self._fitz = fitz except Exception: self._fitz = None def build(self, root: Path, out_jsonl: Path) -> List[Record]: out_jsonl.parent.mkdir(parents=True, exist_ok=True) records: List[Record] = [] for p in root.rglob("*"): if not p.is_file(): continue suf = p.suffix.lower() try: if suf in {".txt", ".md", ".rst"}: text = read_text_guess(p) if text.strip(): records.append(Record(id=str(p), title=p.stem, text=text, source_path=str(p))) elif suf in {".html", ".htm"}: raw = read_text_guess(p) text = self._strip_html(raw) if text.strip(): records.append(Record(id=str(p), title=p.stem, text=text, source_path=str(p))) elif suf == ".pdf" and self._fitz is not None: text = self._pdf_text(p) if text.strip(): records.append(Record(id=str(p), title=p.stem, text=text, source_path=str(p))) else: records.append(Record(id=str(p), title=p.stem, text=f"[Unsupported file type: {suf}]", source_path=str(p))) except Exception: records.append(Record(id=str(p), title=p.stem, text=f"[Error reading file: {p.name}]", source_path=str(p))) with out_jsonl.open("w", encoding="utf-8") as fh: for r in records: fh.write(json.dumps(r.__dict__, ensure_ascii=False) + "\n") return records def _strip_html(self, html_text: str) -> str: try: from bs4 import BeautifulSoup # type: ignore soup = BeautifulSoup(html_text, "html.parser") for tag in soup(["script", "style"]): tag.decompose() text = soup.get_text("\n", strip=True) return text except Exception: txt = re.sub(r"<\s*(script|style)[^>]*>.*?<\s*/\s*\1\s*>", " ", html_text, flags=re.S | re.I) txt = re.sub(r"<[^>]+>", " ", txt) txt = re.sub(r"\s+", " ", txt) return txt.strip() def _pdf_text(self, path: Path) -> str: try: doc = self._fitz.open(str(path)) out = [] for i in range(len(doc)): page = doc.load_page(i) out.append(page.get_text("text")) return "\n\n".join(out) except Exception: return "" class ExternalCorpusBuilder: """Invokes corpus_builder.py as a subprocess to build a JSONL corpus.""" def __init__(self, script_path: Path) -> None: self.script = script_path def build(self, root: Path, out_jsonl: Path, *, workers: int = 4, verbose: bool = False) -> bool: cmd = [ sys.executable, str(self.script), "--root", str(root), "--out", str(out_jsonl), "--emit", "auto", "--workers", str(max(1, workers)), "--llm-parallel", "1", ] if verbose: cmd.append("--verbose") try: proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) ok = proc.returncode == 0 and out_jsonl.exists() and out_jsonl.stat().st_size > 0 return ok except Exception: return False # ----------------------------- # Ollama client # ----------------------------- class OllamaClient: def __init__(self, host: str = "http://localhost:11434", timeout: int = 600): self.host = host.rstrip("/") self.timeout = timeout def generate(self, model: str, prompt: str) -> str: import urllib.request import urllib.error url = f"{self.host}/api/generate" payload = { "model": model, "prompt": prompt, "stream": False, } data = json.dumps(payload).encode("utf-8") req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}, method="POST") try: with urllib.request.urlopen(req, timeout=self.timeout) as resp: body = resp.read() except urllib.error.HTTPError as e: raise RuntimeError(f"Ollama HTTP error {e.code}: {e.read().decode('utf-8', 'ignore')}") except Exception as e: raise RuntimeError(f"Ollama request failed: {e}") try: obj = json.loads(body.decode("utf-8", "ignore")) except Exception: raise RuntimeError("Invalid JSON from Ollama") return (obj.get("response") or "").strip() def _parse_json_strict(s: str) -> Optional[Dict[str, str]]: try: s = sanitize_llm_text_simple(s) return json.loads(s) except Exception: m = re.search(r"\{[\s\S]*\}", s) if m: try: return json.loads(m.group(0)) except Exception: return None return None # ----------------------------- # Prompting # ----------------------------- PROMPT_TEMPLATE = """ You are a cross-domain concept developer (product strategist, creative producer, research lead, grant writer). Turn the sources into a concise, presentable CONCEPT document. Adapt to the domain. INSTRUCTIONS 1) Detect IDEA TYPE (pick one primary; if unclear, choose closest and add a TODO): {Product/Software, Service, Research/Study, Policy/Proposal, Art/Exhibition/Performance, Event/Program, Education/Curriculum, Media/Film/Publication, Campaign/Nonprofit, Data/ML/Infrastructure, Game/Interactive, Writing/Book/Article, Other} 2) Tone & register: - Product/Software -> pragmatic PM/tech brief - Research -> neutral academic project brief - Policy -> policy memo - Art/Exhibition/Performance -> curator/producer note (clear, not flowery) - Event -> producer's run-of-show style - Education -> syllabus brief - Media/Publication -> one-sheet - Campaign/Nonprofit -> strategy brief - Data/ML/Infrastructure -> engineering design note - Game/Interactive -> design doc overview - Writing/Book/Article -> proposal overview 3) Output Markdown using these core sections (use these exact headings; include only relevant ones): - Overview & Intent - Context / Problem (or Opportunity) - Audience / Stakeholders - Deliverables / Outputs & Scope - Approach / Method (rename to "Methodology", "Implementation Plan", "Format & Installation Plan", etc., to fit the idea type) - Resources / Budget / Tools (only if present; else add a short TODO) - Timeline & Milestones - Risks, Ethics & Constraints - Success Criteria / Evaluation - Open Questions (TODOs) Add one domain-specific block (only if relevant and supported by sources): - Product/Software: Key Features; Non-Goals; Rough Architecture; Dependencies & Integration; License. - Research/Study: Research Questions; Methodology & Data; Expected Contributions; References/Citations. - Policy/Proposal: Policy Mechanism; Legal/Standards; Impact Assessment; Implementation Steps. - Art/Exhibition/Performance: Conceptual Frame & References; Medium/Materials; Venue/Spatial Requirements; Tech/AV; Rights/Permissions. - Event/Program: Programme Outline / Run-of-Show; Roles & Staffing; Logistics & Venue. - Education/Curriculum: Learning Objectives; Syllabus Outline; Assessment & Materials. - Media/Film/Publication: Logline & Synopsis; Format; Production Plan; Distribution. - Campaign/Nonprofit: Theory of Change; Channels & Tactics; KPIs; Partnerships. - Data/ML/Infrastructure: Data Sources; Models; Architecture Diagram (describe); Privacy & Compliance; Ops/Monitoring. - Game/Interactive: Core Loop; Mechanics; Narrative; Tech; Monetization (if relevant). - Writing/Book/Article: Thesis; Outline/Chapters; Sources; Target Readers. 4) Evidence use: - Use only facts in Notes/KB. If missing, add short TODOs instead of inventing. - Where a claim relies on a specific source, include a short inline blockquote with "Source: ". 5) Assets: - These files are committed alongside README.md. Embed images with Markdown and link documents where they help clarity. STYLE - Short paragraphs and bullets; concrete, specific, and actionable. Avoid marketing fluff. - If dates/budget/ownership are uncertain, show ranges or TODOs. - Keep a neutral, professional tone adapted to the idea type. TITLE - Generate a neutral 2-4 words working title. - Begin the document with "# {Title}". Assets Provided: {ASSETS} Notes (from user): {NOTES} Knowledge Base (source excerpts): {KB} """.strip() REPHRASE_LENSES = [ { "key": "neutral", "label": "Neutral Clarification / Expansion", "prompt": """Take the following rough note and turn it into a single clear, concise paragraph that captures the main idea. - Keep a neutral, explanatory tone. - Don't add new features or speculation, only clarify and connect what is already there. - Output exactly one paragraph. Note: {USER_NOTE} """, }, { "key": "problem_solution", "label": "Problem-Solution Framing", "prompt": """Rewrite the following note as a single paragraph that clearly describes: 1. What problem or frustration exists, 2. For whom, 3. How the idea could solve it in principle. Keep it concrete but high-level, no implementation details. Output exactly one paragraph. Note: {USER_NOTE} """, }, { "key": "user_story", "label": "User Story / Scenario", "prompt": """Rewrite the following note as a single paragraph that describes a short scenario from a user's point of view. Show how a specific person encounters the situation and how this idea helps them. Keep it realistic and simple, not hype-y. Output exactly one paragraph. Note: {USER_NOTE} """, }, { "key": "value_prop", "label": "Value Proposition / Pitch", "prompt": """Rewrite the following note as a single paragraph that sounds like a clear, simple pitch of the idea. Explain what it is, who it's for, and why it's valuable or interesting. Avoid buzzwords; keep it grounded and concrete. Output exactly one paragraph. Note: {USER_NOTE} """, }, { "key": "implementation", "label": "Implementation / Next Steps", "prompt": """Rewrite the following note as a single paragraph that keeps the original idea but focuses on how one might start implementing or exploring it. Mention 2-3 plausible first steps or components without going into deep technical detail. Output exactly one paragraph. Note: {USER_NOTE} """, }, ] EXTEND_PROMPT = """ You are continuing the user's own note. Keep writing in the same language, tone, and formatting style they used. Instructions: - Extend the idea with additional possibilities, use cases, angles, or problems to consider. - Preserve the author's voice: match their formality, punctuation habits, and quirks (e.g., all lowercase, terse bullets, or formal sentences). - Do not summarize or rewrite the original; add new material that flows naturally after it. - Keep it concise (2-5 sentences or a few short bullet points). - If the input is in bullet form, continue the bullets; otherwise, continue the paragraph. Original note: {USER_NOTE} """.strip() def build_kb_string(records: List[Record], *, max_chars: int = 80000, per_record_cap: int = 4000) -> str: parts: List[str] = [] budget = max_chars for r in records: if budget <= 0: break text = (r.text or "").strip() if not text: continue if len(text) > per_record_cap: text = text[:per_record_cap] + "\n...[truncated]" title = r.title or (Path(r.source_path).name if r.source_path else r.id) header = f"\n---\nSource: {title}\nPath: {r.source_path or ''}\n\n" chunk = header + text.strip() + "\n" if len(chunk) > budget: chunk = chunk[:budget] parts.append(chunk) budget -= len(chunk) return ("\n".join(parts)).strip() def sanitize_llm_text_simple(s: str) -> str: try: s = re.sub(r".*?", "", s, flags=re.S | re.I) s = re.sub(r"^\s*```(?:\w+)?\s*", "", s) s = re.sub(r"\s*```\s*$", "", s) return s.strip() except Exception: return (s or "").strip() def md_heading_replace_or_insert(md: str, title: str) -> str: if not md: return f"# {title}\n\n" lines = md.splitlines() if lines and re.match(r"^\s*#\s+project\s+concept\s*$", lines[0], flags=re.I): lines[0] = f"# {title}" return "\n".join(lines) if lines and re.match(r"^\s*#\s+", lines[0]): return md return f"# {title}\n\n" + md def strip_wrapping_quotes(s: str) -> str: s = s.strip() s = re.sub(r"^[\"'""'']+", "", s) s = re.sub(r"[\"'""'']+$", "", s) return s # ----------------------------- # Core engine # ----------------------------- class ConceptEngine: def __init__(self, *, status_cb: Optional[Any] = None) -> None: self.status_cb = status_cb self.files: List[Path] = [] self.websites: List[str] = [] self.records: List[Record] = [] self.file_hashes: Dict[str, str] = {} self._seen_hashes: Set[str] = set() self._ingesting: Set[str] = set() self._base_dir: Path = IDEA_HOLE_DIR self._files_dir: Path = self._base_dir / "files" self._corpus_file: Path = self._base_dir / "corpus.jsonl" self._sessions_file: Path = self._base_dir / "sessions.jsonl" self._init_storage() def _status(self, msg: str) -> None: if self.status_cb: try: self.status_cb(msg) except Exception: pass def _init_storage(self) -> None: try: self._base_dir.mkdir(parents=True, exist_ok=True) self._files_dir.mkdir(parents=True, exist_ok=True) if not self._corpus_file.exists(): self._corpus_file.write_text("", encoding="utf-8") if not self._sessions_file.exists(): self._sessions_file.write_text("", encoding="utf-8") self._seen_hashes = set() with self._corpus_file.open("r", encoding="utf-8") as fh: for line in fh: if not line or not line.strip(): continue try: obj = json.loads(line) except Exception: continue h = obj.get("file_hash") if h: self._seen_hashes.add(str(h)) except Exception: self._seen_hashes = set() def _compute_file_hash(self, path: Path) -> str: h = hashlib.sha256() try: with path.open("rb") as fh: while True: b = fh.read(1024 * 1024) if not b: break h.update(b) except Exception: st = None try: st = path.stat() except Exception: pass h.update((str(path) + "|" + str(getattr(st, "st_mtime", 0.0))).encode("utf-8", "ignore")) return h.hexdigest() def _compute_url_hash(self, url: str) -> str: try: return hashlib.sha256(url.strip().encode("utf-8", "ignore")).hexdigest() except Exception: return hashlib.sha256(url.encode("utf-8", "ignore")).hexdigest() def _ensure_file_symlink(self, src: Path, file_hash: str) -> Path: dst = self._files_dir / f"{file_hash}__{src.name}" try: if not dst.exists(): copy_or_link(src, self._files_dir) except Exception: pass return dst def _ingest_single_file(self, src: Path, file_hash: str, *, verbose: bool = False) -> bool: try: tmp_dir = self._base_dir / "ingest_tmp" / file_hash try: if tmp_dir.exists(): shutil.rmtree(tmp_dir) except Exception: pass tmp_dir.mkdir(parents=True, exist_ok=True) copy_or_link(src, tmp_dir) external = None script = REPO_ROOT / "corpus_builder.py" if script.exists(): external = ExternalCorpusBuilder(script) tmp_out = tmp_dir / "out.jsonl" ok = False if external is not None: self._status(f"Indexing {src.name} (external)...") ok = external.build(tmp_dir, tmp_out, workers=2, verbose=verbose) if not ok: self._status(f"Indexing {src.name} (simple)...") try: simple = SimpleCorpusBuilder() recs = simple.build(tmp_dir, tmp_out) ok = bool(recs) except Exception: ok = False if ok and tmp_out.exists(): ts = int(time.time()) with tmp_out.open("r", encoding="utf-8") as fh_in, self._corpus_file.open("a", encoding="utf-8") as fh_out: for line in fh_in: if not line.strip(): continue try: obj = json.loads(line) except Exception: continue obj["file_hash"] = file_hash obj["source_path"] = str(src.resolve()) obj.setdefault("mime", obj.get("mime") or None) obj["added_at"] = ts try: fh_out.write(json.dumps(obj, ensure_ascii=False) + "\n") except Exception: fh_out.write(json.dumps(obj) + "\n") self._seen_hashes.add(file_hash) return True return False finally: try: shutil.rmtree(self._base_dir / "ingest_tmp" / file_hash) except Exception: pass def _ingest_single_url(self, url: str, url_hash: str) -> bool: try: self._status(f"Fetching {url}...") try: html_text, _hdrs = websearch._http_get(url, timeout=25) except Exception: return False text = websearch._extract_text(html_text) if not text.strip(): return False title = self._friendly_url_name(url) try: m = re.search(r"(.*?)", html_text, flags=re.I | re.S) if m: raw_title = m.group(1) cleaned = re.sub(r"\s+", " ", raw_title) try: cleaned = html.unescape(cleaned) except Exception: pass cleaned = cleaned.strip() if cleaned: title = cleaned except Exception: pass ts = int(time.time()) obj = { "id": url, "title": title, "text": text, "source_path": url, "mime": "text/html", "file_hash": url_hash, "added_at": ts, } with self._corpus_file.open("a", encoding="utf-8") as fh_out: fh_out.write(json.dumps(obj, ensure_ascii=False) + "\n") self._seen_hashes.add(url_hash) return True except Exception: return False @staticmethod def _friendly_url_name(url: str) -> str: try: from urllib.parse import urlparse parsed = urlparse(url) host = parsed.netloc or url path = (parsed.path or "").strip("/").split("/") if path and path[0]: first = path[0][:40] return f"{host}/{first}" return host except Exception: return url def _ensure_corpus_for_files(self, paths: List[Path]) -> None: if not paths: return to_ingest: List[Tuple[Path, str]] = [] for p in paths: try: h = self._compute_file_hash(p) except Exception: continue self.file_hashes[str(p)] = h self._ensure_file_symlink(p, h) if h not in self._seen_hashes and h not in self._ingesting: to_ingest.append((p, h)) if not to_ingest: return for src, h in to_ingest: self._ingesting.add(h) try: self._ingest_single_file(src, h, verbose=False) finally: try: self._ingesting.remove(h) except Exception: pass def _ensure_corpus_for_urls(self, urls: List[str]) -> None: if not urls: return to_ingest: List[Tuple[str, str]] = [] for u in urls: if not u: continue h = self._compute_url_hash(u) self.file_hashes[u] = h if h not in self._seen_hashes and h not in self._ingesting: to_ingest.append((u, h)) if not to_ingest: return for url, h in to_ingest: self._ingesting.add(h) try: self._ingest_single_url(url, h) finally: try: self._ingesting.remove(h) except Exception: pass def _load_records_for_hashes(self, hashes: Set[str]) -> List[Record]: out: List[Record] = [] if not hashes: return out try: with self._corpus_file.open("r", encoding="utf-8") as fh: for line in fh: if not line.strip(): continue try: obj = json.loads(line) except Exception: continue if str(obj.get("file_hash") or "") not in hashes: continue out.append(Record( id=str(obj.get("id", "")), title=str(obj.get("title", "")), text=str(obj.get("text", "")), source_path=str(obj.get("source_path", "")) if obj.get("source_path") else None, mime=str(obj.get("mime", "")) if obj.get("mime") else None, )) except Exception: pass return out def build_kb_records(self, files: List[str], websites: List[str]) -> List[Record]: paths = [Path(p) for p in files] self._ensure_corpus_for_files(paths) self._ensure_corpus_for_urls(websites) hashes = {self.file_hashes.get(str(p)) for p in paths} hashes.update({self.file_hashes.get(u) for u in websites}) hashes = {h for h in hashes if h} self.records = self._load_records_for_hashes(hashes) return self.records # --- Sessions def _load_all_sessions(self) -> List[Dict[str, Any]]: entries: List[Dict[str, Any]] = [] try: with self._sessions_file.open("r", encoding="utf-8") as fh: for line in fh: if not line.strip(): continue try: obj = json.loads(line) if isinstance(obj, dict) and obj.get("title"): entries.append(obj) except Exception: continue except Exception: pass return entries def _write_all_sessions(self, entries: List[Dict[str, Any]]) -> None: tmp = self._sessions_file.with_suffix(".tmp") try: with tmp.open("w", encoding="utf-8") as fh: for obj in entries: fh.write(json.dumps(obj, ensure_ascii=False) + "\n") tmp.replace(self._sessions_file) except Exception: with self._sessions_file.open("w", encoding="utf-8") as fh: for obj in entries: fh.write(json.dumps(obj, ensure_ascii=False) + "\n") def _session_title_exists(self, title: str) -> bool: t = (title or "").strip() if not t: return False try: with self._sessions_file.open("r", encoding="utf-8") as fh: for line in fh: if not line.strip(): continue try: obj = json.loads(line) if isinstance(obj, dict) and (obj.get("title") or "").strip() == t: return True except Exception: continue except Exception: pass return False def list_sessions(self) -> List[Dict[str, Any]]: out = [] for e in self._load_all_sessions(): out.append({ "title": e.get("title") or "", "description": e.get("description") or "", "saved_at": e.get("saved_at") or 0, }) return out def load_session(self, title: str) -> Optional[Dict[str, Any]]: t = (title or "").strip() if not t: return None for e in self._load_all_sessions(): if (e.get("title") or "").strip() == t: return e return None def save_session(self, payload: Dict[str, Any], *, allow_overwrite: bool) -> Dict[str, Any]: title = (payload.get("title") or "").strip() if not title: raise RuntimeError("Title is required to save a session.") exists = self._session_title_exists(title) if exists and not allow_overwrite: raise RuntimeError("Session already exists") files_list = payload.get("files") or [] websites_list = payload.get("websites") or [] self._ensure_corpus_for_files([Path(f["path"]) for f in files_list if f.get("path")]) self._ensure_corpus_for_urls([w.get("url") for w in websites_list if w.get("url")]) files_meta = [] for f in files_list: path = f.get("path") if not path: continue h = self.file_hashes.get(path) or self._compute_file_hash(Path(path)) self.file_hashes[path] = h files_meta.append({ "path": path, "file_hash": h, "include": bool(f.get("include", True)), }) websites_meta = [] for w in websites_list: url = w.get("url") if not url: continue h = self.file_hashes.get(url) or self._compute_url_hash(url) self.file_hashes[url] = h websites_meta.append({ "url": url, "file_hash": h, "include": bool(w.get("include", True)), }) record = { "title": title, "description": (payload.get("description") or "").strip(), "notes": (payload.get("notes") or "").strip(), "concept": (payload.get("concept") or "").strip(), "files": files_meta, "websites": websites_meta, "saved_at": int(time.time()), "rephrase_variants": payload.get("rephrase_variants") or [], "rephrase_selected_key": payload.get("rephrase_selected_key"), } entries = self._load_all_sessions() if exists: entries = [e for e in entries if (e.get("title") or "") != title] entries.append(record) self._write_all_sessions(entries) return record # ----------------------------- # Concept generation helpers # ----------------------------- def _extract_title_desc(concept_md: str, *, client: OllamaClient, model: str) -> Tuple[Optional[str], Optional[str]]: try: prompt = ( "Extract a concise title and a one-sentence description from the following concept.\n" "- Title: <= 50 chars (3-5 words).\n- Description: <= 120 chars, (one sentence) no trailing period.\n" "Return ONLY strict JSON with keys 'title' and 'description'.\n\nCONCEPT:\n" + concept_md ) raw = client.generate(model, prompt) obj = _parse_json_strict(raw) or {} title = strip_wrapping_quotes(str(obj.get("title") or "").strip()) or None desc = strip_wrapping_quotes(str(obj.get("description") or "").strip()) or None return title, desc except Exception: return None, None # ----------------------------- # PDF conversion helpers # ----------------------------- def _slug(s: str) -> str: s = re.sub(r"[\s]+", "-", s.strip()) s = re.sub(r"[^a-zA-Z0-9._-]", "-", s) return re.sub(r"-+", "-", s).strip("-_") def _convert_markdown_to_pdf(md_file: Path, out_pdf: Path) -> Tuple[bool, Optional[Path]]: concept_dir = out_pdf.parent concept_dir.mkdir(parents=True, exist_ok=True) logs_dir = IDEA_HOLE_DIR / "logs" logs_dir.mkdir(parents=True, exist_ok=True) log_path = logs_dir / f"pdf_export_{concept_dir.name}.log" pandoc = resolve_command("pandoc") tectonic = resolve_command("tectonic") lines: List[str] = [] lines.append(f"PATH={os.environ.get('PATH','')}") lines.append(f"md_file={md_file}") lines.append(f"resolved pandoc={pandoc}") lines.append(f"resolved tectonic={tectonic}") if not pandoc or not tectonic: lines.append("Missing required tools: pandoc and/or tectonic.") try: log_path.write_text("\n".join(lines), encoding="utf-8") except Exception: pass return False, log_path tmp_base = IDEA_HOLE_DIR / "tmp_pdf" / concept_dir.name try: if tmp_base.exists(): shutil.rmtree(tmp_base) except Exception: pass tmp_base.mkdir(parents=True, exist_ok=True) try: text = md_file.read_text(encoding="utf-8") except Exception as e: lines.append(f"read error: {e}") try: log_path.write_text("\n".join(lines), encoding="utf-8") except Exception: pass return False, log_path img_rgx = re.compile(r"!\[[^\]]*\]\(([^\s)]+)(?:\s+\"[^\"]*\")?\)") allowed_ext = {".png", ".jpg", ".jpeg", ".pdf", ".eps"} def ensure_image_available(src: str) -> str: p = Path(src) if not p.is_absolute(): p = (concept_dir / p).resolve() if not p.exists(): alt = (concept_dir / Path(src).name).resolve() if alt.exists(): p = alt else: lines.append(f"missing image: {src}") return src ext = p.suffix.lower() if ext in allowed_ext: out_name = p.name out_path = tmp_base / out_name try: if not out_path.exists(): shutil.copy2(str(p), str(out_path)) return out_name except Exception as e: lines.append(f"copy fail: {src} -> {out_name} ({e})") return src if ext == ".svg": out_name = p.stem + ".png" out_path = tmp_base / out_name try: from cairosvg import svg2png # type: ignore svg2png(url=str(p), write_to=str(out_path)) return out_name except Exception as e_svg_py: lines.append(f"cairosvg unavailable or failed: {e_svg_py}") try: tool = resolve_command("rsvg-convert") if tool: res = subprocess.run([tool, "-f", "png", "-o", str(out_path), str(p)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=subprocess_env()) if res.returncode == 0 and out_path.exists(): return out_name lines.append(f"rsvg-convert failed: exit {res.returncode}, {res.stdout}") except Exception as e_svg_cli: lines.append(f"rsvg-convert error: {e_svg_cli}") try: tool = resolve_command("magick") or resolve_command("convert") if tool: res = subprocess.run([tool, str(p), str(out_path)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=subprocess_env()) if res.returncode == 0 and out_path.exists(): return out_name lines.append(f"imagemagick failed: exit {res.returncode}, {res.stdout}") except Exception as e_im: lines.append(f"imagemagick error: {e_im}") try: from PIL import Image # type: ignore img = Image.open(str(p)) try: img.seek(0) except Exception: pass out_name = p.stem + ".png" out_path = tmp_base / out_name img.convert("RGBA" if img.mode in ("P", "LA") else "RGB").save(str(out_path), format="PNG") return out_name except Exception as e: lines.append(f"convert fail: {src} -> png ({e})") out_name = p.name out_path = tmp_base / out_name try: shutil.copy2(str(p), str(out_path)) return out_name except Exception as e2: lines.append(f"final copy fail: {src} ({e2})") return src def _repl(m: re.Match) -> str: orig = m.group(0) path = m.group(1) rep = ensure_image_available(path) return orig.replace(path, rep) mod_text = img_rgx.sub(_repl, text) def _preserve_extra_blank_lines(s: str) -> str: s = s.replace("\r\n", "\n").replace("\r", "\n") lines_in = s.split("\n") out_lines: List[str] = [] in_fence = False blank_run = 0 for ln in lines_in: stripped = ln.lstrip() if stripped.startswith("```") or stripped.startswith("~~~"): if blank_run > 0: out_lines.append("") for _ in range(blank_run - 1): out_lines.append("\\vspace{1em}") blank_run = 0 out_lines.append(ln) in_fence = not in_fence continue if in_fence: if blank_run > 0: out_lines.append("") for _ in range(blank_run - 1): out_lines.append("\\vspace{1em}") blank_run = 0 out_lines.append(ln) continue if stripped == "": blank_run += 1 continue if blank_run > 0: out_lines.append("") for _ in range(blank_run - 1): out_lines.append("\\vspace{1em}") blank_run = 0 out_lines.append(ln) if blank_run > 0: out_lines.append("") for _ in range(blank_run - 1): out_lines.append("\\vspace{1em}") return "\n".join(out_lines) mod_text = _preserve_extra_blank_lines(mod_text) tmp_md = tmp_base / "README_pdf.md" tmp_md.write_text(mod_text, encoding="utf-8") cmd = [ pandoc, str(tmp_md), "-f", "markdown+hard_line_breaks+raw_tex", "-s", f"--pdf-engine={tectonic}", "-V", "mainfont=Helvetica", "-V", "monofont=Menlo", "-V", "geometry:margin=20mm", "-V", "fontsize=11pt", "--resource-path", f"{str(tmp_base)}:{str(concept_dir)}", "-o", str(out_pdf), ] res = subprocess.run(cmd, cwd=str(tmp_base), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=subprocess_env()) lines.append("$ " + " ".join(cmd)) lines.append(f"(exit {res.returncode})") lines.append(res.stdout or "") ok = (res.returncode == 0 and out_pdf.exists()) if not ok: try: cmd_fallback = [ pandoc, str(tmp_md), "-f", "markdown+hard_line_breaks", "-s", f"--pdf-engine={tectonic}", "-V", "mainfont=Helvetica", "-V", "monofont=Menlo", "-V", "geometry:margin=20mm", "-V", "fontsize=11pt", "--resource-path", f"{str(tmp_base)}:{str(concept_dir)}", "-o", str(out_pdf), ] res2 = subprocess.run(cmd_fallback, cwd=str(tmp_base), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=subprocess_env()) lines.append("$ " + " ".join(cmd_fallback)) lines.append(f"(exit {res2.returncode})") lines.append(res2.stdout or "") ok = (res2.returncode == 0 and out_pdf.exists()) except Exception as e_fallback: lines.append(f"fallback error: {e_fallback}") if not ok: try: log_path.write_text("\n".join(lines), encoding="utf-8") except Exception: pass try: shutil.rmtree(tmp_base) except Exception: pass return ok, log_path # ----------------------------- # Settings # ----------------------------- def settings_path() -> Path: IDEA_HOLE_DIR.mkdir(parents=True, exist_ok=True) return IDEA_HOLE_DIR / "settings.json" def load_settings() -> Dict[str, str]: p = settings_path() if not p.exists(): return {} try: obj = json.loads(p.read_text(encoding="utf-8")) if isinstance(obj, dict): return {k: str(v) for k, v in obj.items() if v is not None} except Exception: return {} return {} def save_settings(settings: Dict[str, str]) -> None: p = settings_path() try: p.write_text(json.dumps(settings, ensure_ascii=False, indent=2), encoding="utf-8") except Exception: p.write_text(json.dumps(settings), encoding="utf-8") # ----------------------------- # Actions # ----------------------------- def list_models() -> List[str]: try: ollama = resolve_command("ollama") if not ollama: return [] res = subprocess.run([ollama, "list"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, timeout=8, env=subprocess_env()) if res.returncode != 0: return [] lines = [ln.strip() for ln in (res.stdout or "").splitlines()] out: List[str] = [] for ln in lines: if not ln or ln.lower().startswith("name"): continue name = ln.split()[0] if name and name not in out: out.append(name) return out except Exception: return [] def stat_paths(paths: List[str], *, expand_dirs: bool) -> List[Dict[str, Any]]: out: List[Dict[str, Any]] = [] seen: Set[str] = set() for raw in paths: if not raw: continue p = Path(raw) if p.is_dir() and expand_dirs: for q in p.rglob("*"): if not q.is_file(): continue if str(q) in seen: continue seen.add(str(q)) out.append({ "name": q.name, "path": str(q), "type": q.suffix.lower() or "file", "size": human_size(q.stat().st_size) if q.exists() else "?", }) elif p.is_file(): if str(p) in seen: continue seen.add(str(p)) out.append({ "name": p.name, "path": str(p), "type": p.suffix.lower() or "file", "size": human_size(p.stat().st_size) if p.exists() else "?", }) return out def rephrase(note: str, host: str, model: str) -> List[Dict[str, str]]: client = OllamaClient(host=host) variants: List[Dict[str, str]] = [{ "key": "original", "label": "Original Note", "text": note, }] for idx, lens in enumerate(REPHRASE_LENSES, start=1): prompt = (lens.get("prompt") or "").replace("{USER_NOTE}", note) raw = client.generate(model=model, prompt=prompt) text = sanitize_llm_text_simple(raw) variants.append({ "key": lens.get("key") or f"lens_{idx}", "label": lens.get("label") or f"Variant {idx}", "text": text, }) return variants def extend(note: str, host: str, model: str) -> str: client = OllamaClient(host=host) prompt = EXTEND_PROMPT.replace("{USER_NOTE}", note) raw = client.generate(model=model, prompt=prompt) text = sanitize_llm_text_simple(raw) if not text.strip(): raise RuntimeError("Empty response from model") return text def generate_concept(payload: Dict[str, Any]) -> Dict[str, Any]: notes = (payload.get("notes") or "").strip() files = payload.get("files") or [] websites = payload.get("websites") or [] host = payload.get("ollama_host") or "http://localhost:11434" model = payload.get("model") or "" engine = ConceptEngine() records = engine.build_kb_records(files, websites) kb = build_kb_string(records) assets_lines: List[str] = [] if files: assets_lines.append("Files:") assets_lines.extend(f"- {Path(p).name}" for p in files) if websites: assets_lines.append("URLs:") assets_lines.extend(f"- {u}" for u in websites) assets_str = "\n".join(assets_lines) or "(none)" prompt = ( PROMPT_TEMPLATE .replace("{NOTES}", notes or "(none)") .replace("{KB}", kb or "(empty)") .replace("{ASSETS}", assets_str) ) client = OllamaClient(host=host) concept_md = client.generate(model=model, prompt=prompt) concept_md = sanitize_llm_text_simple(concept_md) title, desc = _extract_title_desc(concept_md, client=client, model=model) if not desc: desc = "" if title: concept_md = md_heading_replace_or_insert(concept_md, title) if not concept_md.strip(): raise RuntimeError("Empty response from model") return { "concept": concept_md, "title": title or "", "description": strip_wrapping_quotes(desc)[:120], "kb_records": len(records), } def prior_art(payload: Dict[str, Any]) -> Dict[str, Any]: notes = (payload.get("notes") or "").strip() if not notes: notes = "\n\n".join( part for part in ( f"Title: {(payload.get('title') or '').strip()}" if (payload.get("title") or "").strip() else "", f"Description: {(payload.get('description') or '').strip()}" if (payload.get("description") or "").strip() else "", (payload.get("concept") or "").strip(), ) if part ) files = payload.get("files") or [] websites = payload.get("websites") or [] host = payload.get("ollama_host") or "http://localhost:11434" model = payload.get("model") or "" searx_url = payload.get("searx_url") or None engine = ConceptEngine() records = engine.build_kb_records(files, websites) kb = build_kb_string(records) return websearch.prior_art_search( ollama_host=host, model=model, notes=notes, kb=kb, assets=files, searx_url=searx_url, ) def preview_pdf(payload: Dict[str, Any]) -> Dict[str, Any]: concept_text = (payload.get("concept") or "").strip() title = (payload.get("title") or "").strip() files = payload.get("files") or [] output_path = (payload.get("output_path") or "").strip() if not concept_text: raise RuntimeError("Concept text is empty") slug = _slug(title or "preview") base = IDEA_HOLE_DIR / "preview" / f"{slug}-preview" try: if base.exists(): shutil.rmtree(base) except Exception: pass base.mkdir(parents=True, exist_ok=True) md_path = base / "README.md" md_path.write_text(concept_text, encoding="utf-8") assets = [Path(p) for p in files] for src in assets: try: dst = base / src.name if dst.name.lower() in {"readme.md", f"{slug}-concept.pdf".lower(), f"{slug}-preview.pdf".lower()}: dst = base / f"asset-{src.name}" shutil.copy2(src, dst) except Exception: pass if output_path: pdf_path = Path(output_path).expanduser() if pdf_path.exists() and pdf_path.is_dir(): raise RuntimeError(f"Output path is a directory: {pdf_path}") if pdf_path.suffix.lower() != ".pdf": pdf_path = Path(f"{pdf_path}.pdf") else: pdf_path = base / f"{slug}-preview.pdf" ok, log_path = _convert_markdown_to_pdf(md_path, pdf_path) return { "ok": ok, "pdf_path": str(pdf_path), "log_path": str(log_path) if log_path else "", } # ----------------------------- # JSON-RPC style entrypoint # ----------------------------- def _read_stdin_json() -> Dict[str, Any]: raw = sys.stdin.read() if not raw: return {} return json.loads(raw) def main() -> int: try: req = _read_stdin_json() action = req.get("action") payload = req.get("payload") or {} if not action: raise RuntimeError("Missing action") if action == "list_models": result = list_models() elif action == "stat_paths": result = stat_paths(payload.get("paths") or [], expand_dirs=bool(payload.get("expand_dirs"))) elif action == "rephrase": result = rephrase(payload.get("note") or "", payload.get("ollama_host") or "http://localhost:11434", payload.get("model") or "") elif action == "extend": result = extend(payload.get("note") or "", payload.get("ollama_host") or "http://localhost:11434", payload.get("model") or "") elif action == "generate_concept": result = generate_concept(payload) elif action == "prior_art": result = prior_art(payload) elif action == "preview_pdf": result = preview_pdf(payload) elif action == "load_settings": result = load_settings() elif action == "save_settings": save_settings(payload.get("settings") or {}) result = {"ok": True} elif action == "list_sessions": engine = ConceptEngine() result = engine.list_sessions() elif action == "load_session": engine = ConceptEngine() result = engine.load_session(payload.get("title") or "") elif action == "save_session": engine = ConceptEngine() result = engine.save_session(payload.get("payload") or {}, allow_overwrite=bool(payload.get("allow_overwrite"))) else: raise RuntimeError(f"Unknown action: {action}") out = {"ok": True, "data": result} except Exception as e: out = { "ok": False, "error": str(e), "trace": traceback.format_exc(limit=6), } sys.stdout.write(json.dumps(out, ensure_ascii=False)) return 0 if __name__ == "__main__": raise SystemExit(main())