Files
concept-maker/concept_api.py

1437 lines
50 KiB
Python
Raw Permalink Normal View History

2026-02-04 06:54:01 +01:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Headless backend actions for the Concept Maker app.
This module exposes JSON actions for the Tauri UI without desktop toolkit imports.
2026-02-04 06:54:01 +01:00
"""
from __future__ import annotations
import hashlib
import html
import json
import math
import os
import re
import shutil
import subprocess
import sys
import tempfile
import time
import traceback
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
import websearch
# -----------------------------
# Paths
# -----------------------------
2026-05-08 04:12:31 +02:00
REPO_ROOT = Path(getattr(sys, "_MEIPASS", Path(__file__).resolve().parent))
IDEA_HOLE_DIR = Path(os.environ.get("CONCEPT_MAKER_DATA_DIR", REPO_ROOT / ".idea-hole")).expanduser()
SYSTEM_BIN_DIRS = ["/opt/homebrew/bin", "/usr/local/bin", "/opt/local/bin", "/usr/bin", "/bin"]
def resolve_command(name: str) -> Optional[str]:
for base in [None, *SYSTEM_BIN_DIRS]:
p = shutil.which(name) if base is None else os.path.join(base, name)
if p and os.path.exists(p):
return p
return None
def subprocess_env() -> Dict[str, str]:
env = os.environ.copy()
current = env.get("PATH", "")
extra = [path for path in SYSTEM_BIN_DIRS if path and path not in current.split(os.pathsep)]
if extra:
env["PATH"] = os.pathsep.join([*extra, current] if current else extra)
return env
2026-02-04 06:54:01 +01:00
# -----------------------------
# Utilities
# -----------------------------
def human_size(n: int) -> str:
if n <= 0:
return "0 B"
units = ["B", "KB", "MB", "GB", "TB"]
k = 1024.0
i = int(math.floor(math.log(n, k)))
i = max(0, min(i, len(units) - 1))
return f"{n / (k**i):.1f} {units[i]}"
def safe_symlink(src: Path, dst: Path) -> bool:
try:
if dst.exists() or dst.is_symlink():
dst.unlink()
os.symlink(src, dst)
return True
except Exception:
return False
def copy_or_link(src: Path, dst_dir: Path) -> Path:
dst_dir.mkdir(parents=True, exist_ok=True)
dst = dst_dir / src.name
if safe_symlink(src, dst):
return dst
shutil.copy2(src, dst)
return dst
def read_text_guess(path: Path) -> str:
try:
b = path.read_bytes()
for enc in ("utf-8", "utf-16", "latin-1"):
try:
return b.decode(enc)
except Exception:
pass
return b.decode("utf-8", errors="ignore")
except Exception:
return ""
# -----------------------------
# Corpus building
# -----------------------------
@dataclass
class Record:
id: str
title: str
text: str
source_path: Optional[str] = None
mime: Optional[str] = None
class SimpleCorpusBuilder:
"""Very lightweight fallback if corpus_builder.py or deps are unavailable."""
def __init__(self) -> None:
self._fitz = None
try:
import fitz # type: ignore
self._fitz = fitz
except Exception:
self._fitz = None
def build(self, root: Path, out_jsonl: Path) -> List[Record]:
out_jsonl.parent.mkdir(parents=True, exist_ok=True)
records: List[Record] = []
for p in root.rglob("*"):
if not p.is_file():
continue
suf = p.suffix.lower()
try:
if suf in {".txt", ".md", ".rst"}:
text = read_text_guess(p)
if text.strip():
records.append(Record(id=str(p), title=p.stem, text=text, source_path=str(p)))
elif suf in {".html", ".htm"}:
raw = read_text_guess(p)
text = self._strip_html(raw)
if text.strip():
records.append(Record(id=str(p), title=p.stem, text=text, source_path=str(p)))
elif suf == ".pdf" and self._fitz is not None:
text = self._pdf_text(p)
if text.strip():
records.append(Record(id=str(p), title=p.stem, text=text, source_path=str(p)))
else:
records.append(Record(id=str(p), title=p.stem, text=f"[Unsupported file type: {suf}]", source_path=str(p)))
except Exception:
records.append(Record(id=str(p), title=p.stem, text=f"[Error reading file: {p.name}]", source_path=str(p)))
with out_jsonl.open("w", encoding="utf-8") as fh:
for r in records:
fh.write(json.dumps(r.__dict__, ensure_ascii=False) + "\n")
return records
def _strip_html(self, html_text: str) -> str:
try:
from bs4 import BeautifulSoup # type: ignore
soup = BeautifulSoup(html_text, "html.parser")
for tag in soup(["script", "style"]):
tag.decompose()
text = soup.get_text("\n", strip=True)
return text
except Exception:
txt = re.sub(r"<\s*(script|style)[^>]*>.*?<\s*/\s*\1\s*>", " ", html_text, flags=re.S | re.I)
txt = re.sub(r"<[^>]+>", " ", txt)
txt = re.sub(r"\s+", " ", txt)
return txt.strip()
def _pdf_text(self, path: Path) -> str:
try:
doc = self._fitz.open(str(path))
out = []
for i in range(len(doc)):
page = doc.load_page(i)
out.append(page.get_text("text"))
return "\n\n".join(out)
except Exception:
return ""
class ExternalCorpusBuilder:
"""Invokes corpus_builder.py as a subprocess to build a JSONL corpus."""
def __init__(self, script_path: Path) -> None:
self.script = script_path
def build(self, root: Path, out_jsonl: Path, *, workers: int = 4, verbose: bool = False) -> bool:
cmd = [
sys.executable,
str(self.script),
"--root", str(root),
"--out", str(out_jsonl),
"--emit", "auto",
"--workers", str(max(1, workers)),
"--llm-parallel", "1",
]
if verbose:
cmd.append("--verbose")
try:
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
ok = proc.returncode == 0 and out_jsonl.exists() and out_jsonl.stat().st_size > 0
return ok
except Exception:
return False
# -----------------------------
# Ollama client
# -----------------------------
class OllamaClient:
def __init__(self, host: str = "http://localhost:11434", timeout: int = 600):
self.host = host.rstrip("/")
self.timeout = timeout
def generate(self, model: str, prompt: str) -> str:
import urllib.request
import urllib.error
url = f"{self.host}/api/generate"
payload = {
"model": model,
"prompt": prompt,
"stream": False,
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}, method="POST")
try:
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
body = resp.read()
except urllib.error.HTTPError as e:
raise RuntimeError(f"Ollama HTTP error {e.code}: {e.read().decode('utf-8', 'ignore')}")
except Exception as e:
raise RuntimeError(f"Ollama request failed: {e}")
try:
obj = json.loads(body.decode("utf-8", "ignore"))
except Exception:
raise RuntimeError("Invalid JSON from Ollama")
return (obj.get("response") or "").strip()
def _parse_json_strict(s: str) -> Optional[Dict[str, str]]:
try:
s = sanitize_llm_text_simple(s)
return json.loads(s)
except Exception:
m = re.search(r"\{[\s\S]*\}", s)
if m:
try:
return json.loads(m.group(0))
except Exception:
return None
return None
# -----------------------------
# Prompting
# -----------------------------
PROMPT_TEMPLATE = """
You are a cross-domain concept developer (product strategist, creative producer, research lead, grant writer).
Turn the sources into a concise, presentable CONCEPT document. Adapt to the domain.
INSTRUCTIONS
1) Detect IDEA TYPE (pick one primary; if unclear, choose closest and add a TODO):
{Product/Software, Service, Research/Study, Policy/Proposal, Art/Exhibition/Performance, Event/Program,
Education/Curriculum, Media/Film/Publication, Campaign/Nonprofit, Data/ML/Infrastructure, Game/Interactive,
Writing/Book/Article, Other}
2) Tone & register:
- Product/Software -> pragmatic PM/tech brief
- Research -> neutral academic project brief
- Policy -> policy memo
- Art/Exhibition/Performance -> curator/producer note (clear, not flowery)
- Event -> producer's run-of-show style
- Education -> syllabus brief
- Media/Publication -> one-sheet
- Campaign/Nonprofit -> strategy brief
- Data/ML/Infrastructure -> engineering design note
- Game/Interactive -> design doc overview
- Writing/Book/Article -> proposal overview
2026-02-04 06:54:01 +01:00
3) Output Markdown using these core sections (use these exact headings; include only relevant ones):
- Overview & Intent
- Context / Problem (or Opportunity)
- Audience / Stakeholders
- Deliverables / Outputs & Scope
- Approach / Method (rename to "Methodology", "Implementation Plan", "Format & Installation Plan", etc., to fit the idea type)
2026-02-04 06:54:01 +01:00
- Resources / Budget / Tools (only if present; else add a short TODO)
- Timeline & Milestones
- Risks, Ethics & Constraints
- Success Criteria / Evaluation
- Open Questions (TODOs)
Add one domain-specific block (only if relevant and supported by sources):
- Product/Software: Key Features; Non-Goals; Rough Architecture; Dependencies & Integration; License.
- Research/Study: Research Questions; Methodology & Data; Expected Contributions; References/Citations.
- Policy/Proposal: Policy Mechanism; Legal/Standards; Impact Assessment; Implementation Steps.
- Art/Exhibition/Performance: Conceptual Frame & References; Medium/Materials; Venue/Spatial Requirements; Tech/AV; Rights/Permissions.
- Event/Program: Programme Outline / Run-of-Show; Roles & Staffing; Logistics & Venue.
- Education/Curriculum: Learning Objectives; Syllabus Outline; Assessment & Materials.
- Media/Film/Publication: Logline & Synopsis; Format; Production Plan; Distribution.
- Campaign/Nonprofit: Theory of Change; Channels & Tactics; KPIs; Partnerships.
- Data/ML/Infrastructure: Data Sources; Models; Architecture Diagram (describe); Privacy & Compliance; Ops/Monitoring.
- Game/Interactive: Core Loop; Mechanics; Narrative; Tech; Monetization (if relevant).
- Writing/Book/Article: Thesis; Outline/Chapters; Sources; Target Readers.
4) Evidence use:
- Use only facts in Notes/KB. If missing, add short TODOs instead of inventing.
- Where a claim relies on a specific source, include a short inline blockquote with "Source: <Path or Title>".
2026-02-04 06:54:01 +01:00
5) Assets:
- These files are committed alongside README.md. Embed images with Markdown and link documents where they help clarity.
STYLE
- Short paragraphs and bullets; concrete, specific, and actionable. Avoid marketing fluff.
- If dates/budget/ownership are uncertain, show ranges or TODOs.
- Keep a neutral, professional tone adapted to the idea type.
TITLE
- Generate a neutral 2-4 words working title.
- Begin the document with "# {Title}".
2026-02-04 06:54:01 +01:00
Assets Provided:
{ASSETS}
Notes (from user):
{NOTES}
Knowledge Base (source excerpts):
{KB}
""".strip()
REPHRASE_LENSES = [
{
"key": "neutral",
"label": "Neutral Clarification / Expansion",
"prompt": """Take the following rough note and turn it into a single clear, concise paragraph that captures the main idea.
- Keep a neutral, explanatory tone.
- Don't add new features or speculation, only clarify and connect what is already there.
- Output exactly one paragraph.
Note:
{USER_NOTE}
""",
},
{
"key": "problem_solution",
"label": "Problem-Solution Framing",
"prompt": """Rewrite the following note as a single paragraph that clearly describes:
1. What problem or frustration exists,
2. For whom,
3. How the idea could solve it in principle.
Keep it concrete but high-level, no implementation details.
Output exactly one paragraph.
Note:
{USER_NOTE}
""",
},
{
"key": "user_story",
"label": "User Story / Scenario",
"prompt": """Rewrite the following note as a single paragraph that describes a short scenario from a user's point of view.
Show how a specific person encounters the situation and how this idea helps them.
Keep it realistic and simple, not hype-y.
Output exactly one paragraph.
Note:
{USER_NOTE}
""",
},
{
"key": "value_prop",
"label": "Value Proposition / Pitch",
"prompt": """Rewrite the following note as a single paragraph that sounds like a clear, simple pitch of the idea.
Explain what it is, who it's for, and why it's valuable or interesting.
Avoid buzzwords; keep it grounded and concrete.
Output exactly one paragraph.
Note:
{USER_NOTE}
""",
},
{
"key": "implementation",
"label": "Implementation / Next Steps",
"prompt": """Rewrite the following note as a single paragraph that keeps the original idea but focuses on how one might start implementing or exploring it.
Mention 2-3 plausible first steps or components without going into deep technical detail.
Output exactly one paragraph.
Note:
{USER_NOTE}
""",
},
]
EXTEND_PROMPT = """
You are continuing the user's own note. Keep writing in the same language, tone, and formatting style they used.
Instructions:
- Extend the idea with additional possibilities, use cases, angles, or problems to consider.
- Preserve the author's voice: match their formality, punctuation habits, and quirks (e.g., all lowercase, terse bullets, or formal sentences).
- Do not summarize or rewrite the original; add new material that flows naturally after it.
- Keep it concise (2-5 sentences or a few short bullet points).
- If the input is in bullet form, continue the bullets; otherwise, continue the paragraph.
Original note:
{USER_NOTE}
""".strip()
def build_kb_string(records: List[Record], *, max_chars: int = 80000, per_record_cap: int = 4000) -> str:
parts: List[str] = []
budget = max_chars
for r in records:
if budget <= 0:
break
text = (r.text or "").strip()
if not text:
continue
if len(text) > per_record_cap:
text = text[:per_record_cap] + "\n...[truncated]"
title = r.title or (Path(r.source_path).name if r.source_path else r.id)
header = f"\n---\nSource: {title}\nPath: {r.source_path or ''}\n\n"
chunk = header + text.strip() + "\n"
if len(chunk) > budget:
chunk = chunk[:budget]
parts.append(chunk)
budget -= len(chunk)
return ("\n".join(parts)).strip()
def sanitize_llm_text_simple(s: str) -> str:
try:
s = re.sub(r"<think>.*?</think>", "", s, flags=re.S | re.I)
s = re.sub(r"^\s*```(?:\w+)?\s*", "", s)
s = re.sub(r"\s*```\s*$", "", s)
return s.strip()
except Exception:
return (s or "").strip()
def md_heading_replace_or_insert(md: str, title: str) -> str:
if not md:
return f"# {title}\n\n"
lines = md.splitlines()
if lines and re.match(r"^\s*#\s+project\s+concept\s*$", lines[0], flags=re.I):
lines[0] = f"# {title}"
return "\n".join(lines)
if lines and re.match(r"^\s*#\s+", lines[0]):
return md
return f"# {title}\n\n" + md
def strip_wrapping_quotes(s: str) -> str:
s = s.strip()
s = re.sub(r"^[\"'""'']+", "", s)
s = re.sub(r"[\"'""'']+$", "", s)
2026-02-04 06:54:01 +01:00
return s
# -----------------------------
# Core engine
# -----------------------------
class ConceptEngine:
def __init__(self, *, status_cb: Optional[Any] = None) -> None:
self.status_cb = status_cb
self.files: List[Path] = []
self.websites: List[str] = []
self.records: List[Record] = []
self.file_hashes: Dict[str, str] = {}
self._seen_hashes: Set[str] = set()
self._ingesting: Set[str] = set()
self._base_dir: Path = IDEA_HOLE_DIR
self._files_dir: Path = self._base_dir / "files"
self._corpus_file: Path = self._base_dir / "corpus.jsonl"
self._sessions_file: Path = self._base_dir / "sessions.jsonl"
self._init_storage()
def _status(self, msg: str) -> None:
if self.status_cb:
try:
self.status_cb(msg)
except Exception:
pass
def _init_storage(self) -> None:
try:
self._base_dir.mkdir(parents=True, exist_ok=True)
self._files_dir.mkdir(parents=True, exist_ok=True)
if not self._corpus_file.exists():
self._corpus_file.write_text("", encoding="utf-8")
if not self._sessions_file.exists():
self._sessions_file.write_text("", encoding="utf-8")
self._seen_hashes = set()
with self._corpus_file.open("r", encoding="utf-8") as fh:
for line in fh:
if not line or not line.strip():
continue
try:
obj = json.loads(line)
except Exception:
continue
h = obj.get("file_hash")
if h:
self._seen_hashes.add(str(h))
except Exception:
self._seen_hashes = set()
def _compute_file_hash(self, path: Path) -> str:
h = hashlib.sha256()
try:
with path.open("rb") as fh:
while True:
b = fh.read(1024 * 1024)
if not b:
break
h.update(b)
except Exception:
st = None
try:
st = path.stat()
except Exception:
pass
h.update((str(path) + "|" + str(getattr(st, "st_mtime", 0.0))).encode("utf-8", "ignore"))
return h.hexdigest()
def _compute_url_hash(self, url: str) -> str:
try:
return hashlib.sha256(url.strip().encode("utf-8", "ignore")).hexdigest()
except Exception:
return hashlib.sha256(url.encode("utf-8", "ignore")).hexdigest()
def _ensure_file_symlink(self, src: Path, file_hash: str) -> Path:
dst = self._files_dir / f"{file_hash}__{src.name}"
try:
if not dst.exists():
copy_or_link(src, self._files_dir)
except Exception:
pass
return dst
def _ingest_single_file(self, src: Path, file_hash: str, *, verbose: bool = False) -> bool:
try:
tmp_dir = self._base_dir / "ingest_tmp" / file_hash
try:
if tmp_dir.exists():
shutil.rmtree(tmp_dir)
except Exception:
pass
tmp_dir.mkdir(parents=True, exist_ok=True)
copy_or_link(src, tmp_dir)
external = None
script = REPO_ROOT / "corpus_builder.py"
if script.exists():
external = ExternalCorpusBuilder(script)
tmp_out = tmp_dir / "out.jsonl"
ok = False
if external is not None:
self._status(f"Indexing {src.name} (external)...")
2026-02-04 06:54:01 +01:00
ok = external.build(tmp_dir, tmp_out, workers=2, verbose=verbose)
if not ok:
self._status(f"Indexing {src.name} (simple)...")
2026-02-04 06:54:01 +01:00
try:
simple = SimpleCorpusBuilder()
recs = simple.build(tmp_dir, tmp_out)
ok = bool(recs)
except Exception:
ok = False
if ok and tmp_out.exists():
ts = int(time.time())
with tmp_out.open("r", encoding="utf-8") as fh_in, self._corpus_file.open("a", encoding="utf-8") as fh_out:
for line in fh_in:
if not line.strip():
continue
try:
obj = json.loads(line)
except Exception:
continue
obj["file_hash"] = file_hash
obj["source_path"] = str(src.resolve())
obj.setdefault("mime", obj.get("mime") or None)
obj["added_at"] = ts
try:
fh_out.write(json.dumps(obj, ensure_ascii=False) + "\n")
except Exception:
fh_out.write(json.dumps(obj) + "\n")
self._seen_hashes.add(file_hash)
return True
return False
finally:
try:
shutil.rmtree(self._base_dir / "ingest_tmp" / file_hash)
except Exception:
pass
def _ingest_single_url(self, url: str, url_hash: str) -> bool:
try:
self._status(f"Fetching {url}...")
2026-02-04 06:54:01 +01:00
try:
html_text, _hdrs = websearch._http_get(url, timeout=25)
except Exception:
return False
text = websearch._extract_text(html_text)
if not text.strip():
return False
title = self._friendly_url_name(url)
try:
m = re.search(r"<title>(.*?)</title>", html_text, flags=re.I | re.S)
if m:
raw_title = m.group(1)
cleaned = re.sub(r"\s+", " ", raw_title)
try:
cleaned = html.unescape(cleaned)
except Exception:
pass
cleaned = cleaned.strip()
if cleaned:
title = cleaned
except Exception:
pass
ts = int(time.time())
obj = {
"id": url,
"title": title,
"text": text,
"source_path": url,
"mime": "text/html",
"file_hash": url_hash,
"added_at": ts,
}
with self._corpus_file.open("a", encoding="utf-8") as fh_out:
fh_out.write(json.dumps(obj, ensure_ascii=False) + "\n")
self._seen_hashes.add(url_hash)
return True
except Exception:
return False
@staticmethod
def _friendly_url_name(url: str) -> str:
try:
from urllib.parse import urlparse
parsed = urlparse(url)
host = parsed.netloc or url
path = (parsed.path or "").strip("/").split("/")
if path and path[0]:
first = path[0][:40]
return f"{host}/{first}"
return host
except Exception:
return url
def _ensure_corpus_for_files(self, paths: List[Path]) -> None:
if not paths:
return
to_ingest: List[Tuple[Path, str]] = []
for p in paths:
try:
h = self._compute_file_hash(p)
except Exception:
continue
self.file_hashes[str(p)] = h
self._ensure_file_symlink(p, h)
if h not in self._seen_hashes and h not in self._ingesting:
to_ingest.append((p, h))
if not to_ingest:
return
for src, h in to_ingest:
self._ingesting.add(h)
try:
self._ingest_single_file(src, h, verbose=False)
finally:
try:
self._ingesting.remove(h)
except Exception:
pass
def _ensure_corpus_for_urls(self, urls: List[str]) -> None:
if not urls:
return
to_ingest: List[Tuple[str, str]] = []
for u in urls:
if not u:
continue
h = self._compute_url_hash(u)
self.file_hashes[u] = h
if h not in self._seen_hashes and h not in self._ingesting:
to_ingest.append((u, h))
if not to_ingest:
return
for url, h in to_ingest:
self._ingesting.add(h)
try:
self._ingest_single_url(url, h)
finally:
try:
self._ingesting.remove(h)
except Exception:
pass
def _load_records_for_hashes(self, hashes: Set[str]) -> List[Record]:
out: List[Record] = []
if not hashes:
return out
try:
with self._corpus_file.open("r", encoding="utf-8") as fh:
for line in fh:
if not line.strip():
continue
try:
obj = json.loads(line)
except Exception:
continue
if str(obj.get("file_hash") or "") not in hashes:
continue
out.append(Record(
id=str(obj.get("id", "")),
title=str(obj.get("title", "")),
text=str(obj.get("text", "")),
source_path=str(obj.get("source_path", "")) if obj.get("source_path") else None,
mime=str(obj.get("mime", "")) if obj.get("mime") else None,
))
except Exception:
pass
return out
def build_kb_records(self, files: List[str], websites: List[str]) -> List[Record]:
paths = [Path(p) for p in files]
self._ensure_corpus_for_files(paths)
self._ensure_corpus_for_urls(websites)
hashes = {self.file_hashes.get(str(p)) for p in paths}
hashes.update({self.file_hashes.get(u) for u in websites})
hashes = {h for h in hashes if h}
self.records = self._load_records_for_hashes(hashes)
return self.records
# --- Sessions
def _load_all_sessions(self) -> List[Dict[str, Any]]:
entries: List[Dict[str, Any]] = []
try:
with self._sessions_file.open("r", encoding="utf-8") as fh:
for line in fh:
if not line.strip():
continue
try:
obj = json.loads(line)
if isinstance(obj, dict) and obj.get("title"):
entries.append(obj)
except Exception:
continue
except Exception:
pass
return entries
def _write_all_sessions(self, entries: List[Dict[str, Any]]) -> None:
tmp = self._sessions_file.with_suffix(".tmp")
try:
with tmp.open("w", encoding="utf-8") as fh:
for obj in entries:
fh.write(json.dumps(obj, ensure_ascii=False) + "\n")
tmp.replace(self._sessions_file)
except Exception:
with self._sessions_file.open("w", encoding="utf-8") as fh:
for obj in entries:
fh.write(json.dumps(obj, ensure_ascii=False) + "\n")
def _session_title_exists(self, title: str) -> bool:
t = (title or "").strip()
if not t:
return False
try:
with self._sessions_file.open("r", encoding="utf-8") as fh:
for line in fh:
if not line.strip():
continue
try:
obj = json.loads(line)
if isinstance(obj, dict) and (obj.get("title") or "").strip() == t:
return True
except Exception:
continue
except Exception:
pass
return False
def list_sessions(self) -> List[Dict[str, Any]]:
out = []
for e in self._load_all_sessions():
out.append({
"title": e.get("title") or "",
"description": e.get("description") or "",
"saved_at": e.get("saved_at") or 0,
})
return out
def load_session(self, title: str) -> Optional[Dict[str, Any]]:
t = (title or "").strip()
if not t:
return None
for e in self._load_all_sessions():
if (e.get("title") or "").strip() == t:
return e
return None
def save_session(self, payload: Dict[str, Any], *, allow_overwrite: bool) -> Dict[str, Any]:
title = (payload.get("title") or "").strip()
if not title:
raise RuntimeError("Title is required to save a session.")
exists = self._session_title_exists(title)
if exists and not allow_overwrite:
raise RuntimeError("Session already exists")
files_list = payload.get("files") or []
websites_list = payload.get("websites") or []
self._ensure_corpus_for_files([Path(f["path"]) for f in files_list if f.get("path")])
self._ensure_corpus_for_urls([w.get("url") for w in websites_list if w.get("url")])
files_meta = []
for f in files_list:
path = f.get("path")
if not path:
continue
h = self.file_hashes.get(path) or self._compute_file_hash(Path(path))
self.file_hashes[path] = h
files_meta.append({
"path": path,
"file_hash": h,
"include": bool(f.get("include", True)),
})
websites_meta = []
for w in websites_list:
url = w.get("url")
if not url:
continue
h = self.file_hashes.get(url) or self._compute_url_hash(url)
self.file_hashes[url] = h
websites_meta.append({
"url": url,
"file_hash": h,
"include": bool(w.get("include", True)),
})
record = {
"title": title,
"description": (payload.get("description") or "").strip(),
"notes": (payload.get("notes") or "").strip(),
"concept": (payload.get("concept") or "").strip(),
"files": files_meta,
"websites": websites_meta,
"saved_at": int(time.time()),
"rephrase_variants": payload.get("rephrase_variants") or [],
"rephrase_selected_key": payload.get("rephrase_selected_key"),
}
entries = self._load_all_sessions()
if exists:
entries = [e for e in entries if (e.get("title") or "") != title]
entries.append(record)
self._write_all_sessions(entries)
return record
# -----------------------------
# Concept generation helpers
# -----------------------------
def _extract_title_desc(concept_md: str, *, client: OllamaClient, model: str) -> Tuple[Optional[str], Optional[str]]:
try:
prompt = (
"Extract a concise title and a one-sentence description from the following concept.\n"
"- Title: <= 50 chars (3-5 words).\n- Description: <= 120 chars, (one sentence) no trailing period.\n"
"Return ONLY strict JSON with keys 'title' and 'description'.\n\nCONCEPT:\n" + concept_md
)
raw = client.generate(model, prompt)
obj = _parse_json_strict(raw) or {}
title = strip_wrapping_quotes(str(obj.get("title") or "").strip()) or None
desc = strip_wrapping_quotes(str(obj.get("description") or "").strip()) or None
return title, desc
except Exception:
return None, None
# -----------------------------
# PDF conversion helpers
2026-02-04 06:54:01 +01:00
# -----------------------------
def _slug(s: str) -> str:
s = re.sub(r"[\s]+", "-", s.strip())
s = re.sub(r"[^a-zA-Z0-9._-]", "-", s)
return re.sub(r"-+", "-", s).strip("-_")
def _convert_markdown_to_pdf(md_file: Path, out_pdf: Path) -> Tuple[bool, Optional[Path]]:
concept_dir = out_pdf.parent
concept_dir.mkdir(parents=True, exist_ok=True)
logs_dir = IDEA_HOLE_DIR / "logs"
logs_dir.mkdir(parents=True, exist_ok=True)
log_path = logs_dir / f"pdf_export_{concept_dir.name}.log"
pandoc = resolve_command("pandoc")
tectonic = resolve_command("tectonic")
2026-02-04 06:54:01 +01:00
lines: List[str] = []
lines.append(f"PATH={os.environ.get('PATH','')}")
lines.append(f"md_file={md_file}")
lines.append(f"resolved pandoc={pandoc}")
lines.append(f"resolved tectonic={tectonic}")
if not pandoc or not tectonic:
lines.append("Missing required tools: pandoc and/or tectonic.")
try:
log_path.write_text("\n".join(lines), encoding="utf-8")
except Exception:
pass
return False, log_path
tmp_base = IDEA_HOLE_DIR / "tmp_pdf" / concept_dir.name
try:
if tmp_base.exists():
shutil.rmtree(tmp_base)
except Exception:
pass
tmp_base.mkdir(parents=True, exist_ok=True)
try:
text = md_file.read_text(encoding="utf-8")
except Exception as e:
lines.append(f"read error: {e}")
try:
log_path.write_text("\n".join(lines), encoding="utf-8")
except Exception:
pass
return False, log_path
img_rgx = re.compile(r"!\[[^\]]*\]\(([^\s)]+)(?:\s+\"[^\"]*\")?\)")
allowed_ext = {".png", ".jpg", ".jpeg", ".pdf", ".eps"}
def ensure_image_available(src: str) -> str:
p = Path(src)
if not p.is_absolute():
p = (concept_dir / p).resolve()
if not p.exists():
alt = (concept_dir / Path(src).name).resolve()
if alt.exists():
p = alt
else:
lines.append(f"missing image: {src}")
return src
ext = p.suffix.lower()
if ext in allowed_ext:
out_name = p.name
out_path = tmp_base / out_name
try:
if not out_path.exists():
shutil.copy2(str(p), str(out_path))
return out_name
except Exception as e:
lines.append(f"copy fail: {src} -> {out_name} ({e})")
return src
if ext == ".svg":
out_name = p.stem + ".png"
out_path = tmp_base / out_name
try:
from cairosvg import svg2png # type: ignore
svg2png(url=str(p), write_to=str(out_path))
return out_name
except Exception as e_svg_py:
lines.append(f"cairosvg unavailable or failed: {e_svg_py}")
try:
tool = resolve_command("rsvg-convert")
2026-02-04 06:54:01 +01:00
if tool:
res = subprocess.run([tool, "-f", "png", "-o", str(out_path), str(p)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=subprocess_env())
2026-02-04 06:54:01 +01:00
if res.returncode == 0 and out_path.exists():
return out_name
lines.append(f"rsvg-convert failed: exit {res.returncode}, {res.stdout}")
except Exception as e_svg_cli:
lines.append(f"rsvg-convert error: {e_svg_cli}")
try:
tool = resolve_command("magick") or resolve_command("convert")
2026-02-04 06:54:01 +01:00
if tool:
res = subprocess.run([tool, str(p), str(out_path)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=subprocess_env())
2026-02-04 06:54:01 +01:00
if res.returncode == 0 and out_path.exists():
return out_name
lines.append(f"imagemagick failed: exit {res.returncode}, {res.stdout}")
except Exception as e_im:
lines.append(f"imagemagick error: {e_im}")
try:
from PIL import Image # type: ignore
img = Image.open(str(p))
try:
img.seek(0)
except Exception:
pass
out_name = p.stem + ".png"
out_path = tmp_base / out_name
img.convert("RGBA" if img.mode in ("P", "LA") else "RGB").save(str(out_path), format="PNG")
return out_name
except Exception as e:
lines.append(f"convert fail: {src} -> png ({e})")
out_name = p.name
out_path = tmp_base / out_name
try:
shutil.copy2(str(p), str(out_path))
return out_name
except Exception as e2:
lines.append(f"final copy fail: {src} ({e2})")
return src
def _repl(m: re.Match) -> str:
orig = m.group(0)
path = m.group(1)
rep = ensure_image_available(path)
return orig.replace(path, rep)
mod_text = img_rgx.sub(_repl, text)
def _preserve_extra_blank_lines(s: str) -> str:
s = s.replace("\r\n", "\n").replace("\r", "\n")
lines_in = s.split("\n")
out_lines: List[str] = []
in_fence = False
blank_run = 0
for ln in lines_in:
stripped = ln.lstrip()
if stripped.startswith("```") or stripped.startswith("~~~"):
if blank_run > 0:
out_lines.append("")
for _ in range(blank_run - 1):
out_lines.append("\\vspace{1em}")
blank_run = 0
out_lines.append(ln)
in_fence = not in_fence
continue
if in_fence:
if blank_run > 0:
out_lines.append("")
for _ in range(blank_run - 1):
out_lines.append("\\vspace{1em}")
blank_run = 0
out_lines.append(ln)
continue
if stripped == "":
blank_run += 1
continue
if blank_run > 0:
out_lines.append("")
for _ in range(blank_run - 1):
out_lines.append("\\vspace{1em}")
blank_run = 0
out_lines.append(ln)
if blank_run > 0:
out_lines.append("")
for _ in range(blank_run - 1):
out_lines.append("\\vspace{1em}")
return "\n".join(out_lines)
mod_text = _preserve_extra_blank_lines(mod_text)
tmp_md = tmp_base / "README_pdf.md"
tmp_md.write_text(mod_text, encoding="utf-8")
cmd = [
pandoc,
str(tmp_md),
"-f", "markdown+hard_line_breaks+raw_tex",
"-s",
f"--pdf-engine={tectonic}",
2026-02-04 06:54:01 +01:00
"-V", "mainfont=Helvetica",
"-V", "monofont=Menlo",
"-V", "geometry:margin=20mm",
"-V", "fontsize=11pt",
"--resource-path", f"{str(tmp_base)}:{str(concept_dir)}",
"-o", str(out_pdf),
]
res = subprocess.run(cmd, cwd=str(tmp_base), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=subprocess_env())
2026-02-04 06:54:01 +01:00
lines.append("$ " + " ".join(cmd))
lines.append(f"(exit {res.returncode})")
lines.append(res.stdout or "")
ok = (res.returncode == 0 and out_pdf.exists())
if not ok:
try:
cmd_fallback = [
pandoc,
str(tmp_md),
"-f", "markdown+hard_line_breaks",
"-s",
f"--pdf-engine={tectonic}",
2026-02-04 06:54:01 +01:00
"-V", "mainfont=Helvetica",
"-V", "monofont=Menlo",
"-V", "geometry:margin=20mm",
"-V", "fontsize=11pt",
"--resource-path", f"{str(tmp_base)}:{str(concept_dir)}",
"-o", str(out_pdf),
]
res2 = subprocess.run(cmd_fallback, cwd=str(tmp_base), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=subprocess_env())
2026-02-04 06:54:01 +01:00
lines.append("$ " + " ".join(cmd_fallback))
lines.append(f"(exit {res2.returncode})")
lines.append(res2.stdout or "")
ok = (res2.returncode == 0 and out_pdf.exists())
except Exception as e_fallback:
lines.append(f"fallback error: {e_fallback}")
if not ok:
try:
log_path.write_text("\n".join(lines), encoding="utf-8")
except Exception:
pass
try:
shutil.rmtree(tmp_base)
except Exception:
pass
return ok, log_path
# -----------------------------
# Settings
# -----------------------------
def settings_path() -> Path:
IDEA_HOLE_DIR.mkdir(parents=True, exist_ok=True)
return IDEA_HOLE_DIR / "settings.json"
def load_settings() -> Dict[str, str]:
p = settings_path()
if not p.exists():
return {}
try:
obj = json.loads(p.read_text(encoding="utf-8"))
if isinstance(obj, dict):
return {k: str(v) for k, v in obj.items() if v is not None}
except Exception:
return {}
return {}
def save_settings(settings: Dict[str, str]) -> None:
p = settings_path()
try:
p.write_text(json.dumps(settings, ensure_ascii=False, indent=2), encoding="utf-8")
except Exception:
p.write_text(json.dumps(settings), encoding="utf-8")
# -----------------------------
# Actions
# -----------------------------
def list_models() -> List[str]:
try:
2026-05-08 04:20:30 +02:00
ollama = resolve_command("ollama")
if not ollama:
return []
res = subprocess.run([ollama, "list"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, timeout=8, env=subprocess_env())
2026-02-04 06:54:01 +01:00
if res.returncode != 0:
return []
lines = [ln.strip() for ln in (res.stdout or "").splitlines()]
out: List[str] = []
for ln in lines:
if not ln or ln.lower().startswith("name"):
continue
name = ln.split()[0]
if name and name not in out:
out.append(name)
return out
except Exception:
return []
def stat_paths(paths: List[str], *, expand_dirs: bool) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
seen: Set[str] = set()
for raw in paths:
if not raw:
continue
p = Path(raw)
if p.is_dir() and expand_dirs:
for q in p.rglob("*"):
if not q.is_file():
continue
if str(q) in seen:
continue
seen.add(str(q))
out.append({
"name": q.name,
"path": str(q),
"type": q.suffix.lower() or "file",
"size": human_size(q.stat().st_size) if q.exists() else "?",
})
elif p.is_file():
if str(p) in seen:
continue
seen.add(str(p))
out.append({
"name": p.name,
"path": str(p),
"type": p.suffix.lower() or "file",
"size": human_size(p.stat().st_size) if p.exists() else "?",
})
return out
def rephrase(note: str, host: str, model: str) -> List[Dict[str, str]]:
client = OllamaClient(host=host)
variants: List[Dict[str, str]] = [{
"key": "original",
"label": "Original Note",
"text": note,
}]
for idx, lens in enumerate(REPHRASE_LENSES, start=1):
prompt = (lens.get("prompt") or "").replace("{USER_NOTE}", note)
raw = client.generate(model=model, prompt=prompt)
text = sanitize_llm_text_simple(raw)
variants.append({
"key": lens.get("key") or f"lens_{idx}",
"label": lens.get("label") or f"Variant {idx}",
"text": text,
})
return variants
def extend(note: str, host: str, model: str) -> str:
client = OllamaClient(host=host)
prompt = EXTEND_PROMPT.replace("{USER_NOTE}", note)
raw = client.generate(model=model, prompt=prompt)
text = sanitize_llm_text_simple(raw)
if not text.strip():
raise RuntimeError("Empty response from model")
return text
def generate_concept(payload: Dict[str, Any]) -> Dict[str, Any]:
notes = (payload.get("notes") or "").strip()
files = payload.get("files") or []
websites = payload.get("websites") or []
host = payload.get("ollama_host") or "http://localhost:11434"
model = payload.get("model") or ""
engine = ConceptEngine()
records = engine.build_kb_records(files, websites)
kb = build_kb_string(records)
assets_lines: List[str] = []
if files:
2026-02-04 06:54:01 +01:00
assets_lines.append("Files:")
assets_lines.extend(f"- {Path(p).name}" for p in files)
if websites:
2026-02-04 06:54:01 +01:00
assets_lines.append("URLs:")
assets_lines.extend(f"- {u}" for u in websites)
2026-02-04 06:54:01 +01:00
assets_str = "\n".join(assets_lines) or "(none)"
prompt = (
PROMPT_TEMPLATE
.replace("{NOTES}", notes or "(none)")
.replace("{KB}", kb or "(empty)")
.replace("{ASSETS}", assets_str)
)
client = OllamaClient(host=host)
concept_md = client.generate(model=model, prompt=prompt)
concept_md = sanitize_llm_text_simple(concept_md)
title, desc = _extract_title_desc(concept_md, client=client, model=model)
if not desc:
desc = ""
if title:
concept_md = md_heading_replace_or_insert(concept_md, title)
if not concept_md.strip():
raise RuntimeError("Empty response from model")
return {
"concept": concept_md,
"title": title or "",
"description": strip_wrapping_quotes(desc)[:120],
"kb_records": len(records),
}
def prior_art(payload: Dict[str, Any]) -> Dict[str, Any]:
notes = (payload.get("notes") or "").strip()
if not notes:
notes = "\n\n".join(
part
for part in (
f"Title: {(payload.get('title') or '').strip()}" if (payload.get("title") or "").strip() else "",
f"Description: {(payload.get('description') or '').strip()}" if (payload.get("description") or "").strip() else "",
(payload.get("concept") or "").strip(),
)
if part
)
2026-02-04 06:54:01 +01:00
files = payload.get("files") or []
websites = payload.get("websites") or []
host = payload.get("ollama_host") or "http://localhost:11434"
model = payload.get("model") or ""
searx_url = payload.get("searx_url") or None
engine = ConceptEngine()
records = engine.build_kb_records(files, websites)
kb = build_kb_string(records)
return websearch.prior_art_search(
ollama_host=host,
model=model,
notes=notes,
kb=kb,
assets=files,
2026-02-04 06:54:01 +01:00
searx_url=searx_url,
)
def preview_pdf(payload: Dict[str, Any]) -> Dict[str, Any]:
concept_text = (payload.get("concept") or "").strip()
title = (payload.get("title") or "").strip()
files = payload.get("files") or []
output_path = (payload.get("output_path") or "").strip()
2026-02-04 06:54:01 +01:00
if not concept_text:
raise RuntimeError("Concept text is empty")
slug = _slug(title or "preview")
base = IDEA_HOLE_DIR / "preview" / f"{slug}-preview"
try:
if base.exists():
shutil.rmtree(base)
except Exception:
pass
base.mkdir(parents=True, exist_ok=True)
md_path = base / "README.md"
md_path.write_text(concept_text, encoding="utf-8")
assets = [Path(p) for p in files]
2026-02-04 06:54:01 +01:00
for src in assets:
try:
dst = base / src.name
if dst.name.lower() in {"readme.md", f"{slug}-concept.pdf".lower(), f"{slug}-preview.pdf".lower()}:
dst = base / f"asset-{src.name}"
shutil.copy2(src, dst)
except Exception:
pass
if output_path:
pdf_path = Path(output_path).expanduser()
if pdf_path.exists() and pdf_path.is_dir():
raise RuntimeError(f"Output path is a directory: {pdf_path}")
if pdf_path.suffix.lower() != ".pdf":
pdf_path = Path(f"{pdf_path}.pdf")
else:
pdf_path = base / f"{slug}-preview.pdf"
2026-02-04 06:54:01 +01:00
ok, log_path = _convert_markdown_to_pdf(md_path, pdf_path)
return {
"ok": ok,
"pdf_path": str(pdf_path),
"log_path": str(log_path) if log_path else "",
}
# -----------------------------
# JSON-RPC style entrypoint
# -----------------------------
def _read_stdin_json() -> Dict[str, Any]:
raw = sys.stdin.read()
if not raw:
return {}
return json.loads(raw)
def main() -> int:
try:
req = _read_stdin_json()
action = req.get("action")
payload = req.get("payload") or {}
if not action:
raise RuntimeError("Missing action")
if action == "list_models":
result = list_models()
elif action == "stat_paths":
result = stat_paths(payload.get("paths") or [], expand_dirs=bool(payload.get("expand_dirs")))
elif action == "rephrase":
result = rephrase(payload.get("note") or "", payload.get("ollama_host") or "http://localhost:11434", payload.get("model") or "")
elif action == "extend":
result = extend(payload.get("note") or "", payload.get("ollama_host") or "http://localhost:11434", payload.get("model") or "")
elif action == "generate_concept":
result = generate_concept(payload)
elif action == "prior_art":
result = prior_art(payload)
elif action == "preview_pdf":
result = preview_pdf(payload)
elif action == "load_settings":
result = load_settings()
elif action == "save_settings":
save_settings(payload.get("settings") or {})
result = {"ok": True}
elif action == "list_sessions":
engine = ConceptEngine()
result = engine.list_sessions()
elif action == "load_session":
engine = ConceptEngine()
result = engine.load_session(payload.get("title") or "")
elif action == "save_session":
engine = ConceptEngine()
result = engine.save_session(payload.get("payload") or {}, allow_overwrite=bool(payload.get("allow_overwrite")))
else:
raise RuntimeError(f"Unknown action: {action}")
out = {"ok": True, "data": result}
except Exception as e:
out = {
"ok": False,
"error": str(e),
"trace": traceback.format_exc(limit=6),
}
sys.stdout.write(json.dumps(out, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())