1437 lines
50 KiB
Python
1437 lines
50 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Headless backend actions for the Concept Maker app.
|
|
|
|
This module exposes JSON actions for the Tauri UI without desktop toolkit imports.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import html
|
|
import json
|
|
import math
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import traceback
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Set, Tuple
|
|
|
|
import websearch
|
|
|
|
# -----------------------------
|
|
# Paths
|
|
# -----------------------------
|
|
|
|
REPO_ROOT = Path(getattr(sys, "_MEIPASS", Path(__file__).resolve().parent))
|
|
IDEA_HOLE_DIR = Path(os.environ.get("CONCEPT_MAKER_DATA_DIR", REPO_ROOT / ".idea-hole")).expanduser()
|
|
SYSTEM_BIN_DIRS = ["/opt/homebrew/bin", "/usr/local/bin", "/opt/local/bin", "/usr/bin", "/bin"]
|
|
|
|
|
|
def resolve_command(name: str) -> Optional[str]:
|
|
for base in [None, *SYSTEM_BIN_DIRS]:
|
|
p = shutil.which(name) if base is None else os.path.join(base, name)
|
|
if p and os.path.exists(p):
|
|
return p
|
|
return None
|
|
|
|
|
|
def subprocess_env() -> Dict[str, str]:
|
|
env = os.environ.copy()
|
|
current = env.get("PATH", "")
|
|
extra = [path for path in SYSTEM_BIN_DIRS if path and path not in current.split(os.pathsep)]
|
|
if extra:
|
|
env["PATH"] = os.pathsep.join([*extra, current] if current else extra)
|
|
return env
|
|
|
|
|
|
# -----------------------------
|
|
# Utilities
|
|
# -----------------------------
|
|
|
|
def human_size(n: int) -> str:
|
|
if n <= 0:
|
|
return "0 B"
|
|
units = ["B", "KB", "MB", "GB", "TB"]
|
|
k = 1024.0
|
|
i = int(math.floor(math.log(n, k)))
|
|
i = max(0, min(i, len(units) - 1))
|
|
return f"{n / (k**i):.1f} {units[i]}"
|
|
|
|
|
|
def safe_symlink(src: Path, dst: Path) -> bool:
|
|
try:
|
|
if dst.exists() or dst.is_symlink():
|
|
dst.unlink()
|
|
os.symlink(src, dst)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def copy_or_link(src: Path, dst_dir: Path) -> Path:
|
|
dst_dir.mkdir(parents=True, exist_ok=True)
|
|
dst = dst_dir / src.name
|
|
if safe_symlink(src, dst):
|
|
return dst
|
|
shutil.copy2(src, dst)
|
|
return dst
|
|
|
|
|
|
def read_text_guess(path: Path) -> str:
|
|
try:
|
|
b = path.read_bytes()
|
|
for enc in ("utf-8", "utf-16", "latin-1"):
|
|
try:
|
|
return b.decode(enc)
|
|
except Exception:
|
|
pass
|
|
return b.decode("utf-8", errors="ignore")
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
# -----------------------------
|
|
# Corpus building
|
|
# -----------------------------
|
|
|
|
@dataclass
|
|
class Record:
|
|
id: str
|
|
title: str
|
|
text: str
|
|
source_path: Optional[str] = None
|
|
mime: Optional[str] = None
|
|
|
|
|
|
class SimpleCorpusBuilder:
|
|
"""Very lightweight fallback if corpus_builder.py or deps are unavailable."""
|
|
|
|
def __init__(self) -> None:
|
|
self._fitz = None
|
|
try:
|
|
import fitz # type: ignore
|
|
self._fitz = fitz
|
|
except Exception:
|
|
self._fitz = None
|
|
|
|
def build(self, root: Path, out_jsonl: Path) -> List[Record]:
|
|
out_jsonl.parent.mkdir(parents=True, exist_ok=True)
|
|
records: List[Record] = []
|
|
for p in root.rglob("*"):
|
|
if not p.is_file():
|
|
continue
|
|
suf = p.suffix.lower()
|
|
try:
|
|
if suf in {".txt", ".md", ".rst"}:
|
|
text = read_text_guess(p)
|
|
if text.strip():
|
|
records.append(Record(id=str(p), title=p.stem, text=text, source_path=str(p)))
|
|
elif suf in {".html", ".htm"}:
|
|
raw = read_text_guess(p)
|
|
text = self._strip_html(raw)
|
|
if text.strip():
|
|
records.append(Record(id=str(p), title=p.stem, text=text, source_path=str(p)))
|
|
elif suf == ".pdf" and self._fitz is not None:
|
|
text = self._pdf_text(p)
|
|
if text.strip():
|
|
records.append(Record(id=str(p), title=p.stem, text=text, source_path=str(p)))
|
|
else:
|
|
records.append(Record(id=str(p), title=p.stem, text=f"[Unsupported file type: {suf}]", source_path=str(p)))
|
|
except Exception:
|
|
records.append(Record(id=str(p), title=p.stem, text=f"[Error reading file: {p.name}]", source_path=str(p)))
|
|
|
|
with out_jsonl.open("w", encoding="utf-8") as fh:
|
|
for r in records:
|
|
fh.write(json.dumps(r.__dict__, ensure_ascii=False) + "\n")
|
|
return records
|
|
|
|
def _strip_html(self, html_text: str) -> str:
|
|
try:
|
|
from bs4 import BeautifulSoup # type: ignore
|
|
soup = BeautifulSoup(html_text, "html.parser")
|
|
for tag in soup(["script", "style"]):
|
|
tag.decompose()
|
|
text = soup.get_text("\n", strip=True)
|
|
return text
|
|
except Exception:
|
|
txt = re.sub(r"<\s*(script|style)[^>]*>.*?<\s*/\s*\1\s*>", " ", html_text, flags=re.S | re.I)
|
|
txt = re.sub(r"<[^>]+>", " ", txt)
|
|
txt = re.sub(r"\s+", " ", txt)
|
|
return txt.strip()
|
|
|
|
def _pdf_text(self, path: Path) -> str:
|
|
try:
|
|
doc = self._fitz.open(str(path))
|
|
out = []
|
|
for i in range(len(doc)):
|
|
page = doc.load_page(i)
|
|
out.append(page.get_text("text"))
|
|
return "\n\n".join(out)
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
class ExternalCorpusBuilder:
|
|
"""Invokes corpus_builder.py as a subprocess to build a JSONL corpus."""
|
|
|
|
def __init__(self, script_path: Path) -> None:
|
|
self.script = script_path
|
|
|
|
def build(self, root: Path, out_jsonl: Path, *, workers: int = 4, verbose: bool = False) -> bool:
|
|
cmd = [
|
|
sys.executable,
|
|
str(self.script),
|
|
"--root", str(root),
|
|
"--out", str(out_jsonl),
|
|
"--emit", "auto",
|
|
"--workers", str(max(1, workers)),
|
|
"--llm-parallel", "1",
|
|
]
|
|
if verbose:
|
|
cmd.append("--verbose")
|
|
try:
|
|
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
|
|
ok = proc.returncode == 0 and out_jsonl.exists() and out_jsonl.stat().st_size > 0
|
|
return ok
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
# -----------------------------
|
|
# Ollama client
|
|
# -----------------------------
|
|
|
|
class OllamaClient:
|
|
def __init__(self, host: str = "http://localhost:11434", timeout: int = 600):
|
|
self.host = host.rstrip("/")
|
|
self.timeout = timeout
|
|
|
|
def generate(self, model: str, prompt: str) -> str:
|
|
import urllib.request
|
|
import urllib.error
|
|
|
|
url = f"{self.host}/api/generate"
|
|
payload = {
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
}
|
|
data = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}, method="POST")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
|
|
body = resp.read()
|
|
except urllib.error.HTTPError as e:
|
|
raise RuntimeError(f"Ollama HTTP error {e.code}: {e.read().decode('utf-8', 'ignore')}")
|
|
except Exception as e:
|
|
raise RuntimeError(f"Ollama request failed: {e}")
|
|
try:
|
|
obj = json.loads(body.decode("utf-8", "ignore"))
|
|
except Exception:
|
|
raise RuntimeError("Invalid JSON from Ollama")
|
|
return (obj.get("response") or "").strip()
|
|
|
|
|
|
def _parse_json_strict(s: str) -> Optional[Dict[str, str]]:
|
|
try:
|
|
s = sanitize_llm_text_simple(s)
|
|
return json.loads(s)
|
|
except Exception:
|
|
m = re.search(r"\{[\s\S]*\}", s)
|
|
if m:
|
|
try:
|
|
return json.loads(m.group(0))
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
|
|
# -----------------------------
|
|
# Prompting
|
|
# -----------------------------
|
|
|
|
PROMPT_TEMPLATE = """
|
|
You are a cross-domain concept developer (product strategist, creative producer, research lead, grant writer).
|
|
Turn the sources into a concise, presentable CONCEPT document. Adapt to the domain.
|
|
|
|
INSTRUCTIONS
|
|
1) Detect IDEA TYPE (pick one primary; if unclear, choose closest and add a TODO):
|
|
{Product/Software, Service, Research/Study, Policy/Proposal, Art/Exhibition/Performance, Event/Program,
|
|
Education/Curriculum, Media/Film/Publication, Campaign/Nonprofit, Data/ML/Infrastructure, Game/Interactive,
|
|
Writing/Book/Article, Other}
|
|
|
|
2) Tone & register:
|
|
- Product/Software -> pragmatic PM/tech brief
|
|
- Research -> neutral academic project brief
|
|
- Policy -> policy memo
|
|
- Art/Exhibition/Performance -> curator/producer note (clear, not flowery)
|
|
- Event -> producer's run-of-show style
|
|
- Education -> syllabus brief
|
|
- Media/Publication -> one-sheet
|
|
- Campaign/Nonprofit -> strategy brief
|
|
- Data/ML/Infrastructure -> engineering design note
|
|
- Game/Interactive -> design doc overview
|
|
- Writing/Book/Article -> proposal overview
|
|
|
|
3) Output Markdown using these core sections (use these exact headings; include only relevant ones):
|
|
- Overview & Intent
|
|
- Context / Problem (or Opportunity)
|
|
- Audience / Stakeholders
|
|
- Deliverables / Outputs & Scope
|
|
- Approach / Method (rename to "Methodology", "Implementation Plan", "Format & Installation Plan", etc., to fit the idea type)
|
|
- Resources / Budget / Tools (only if present; else add a short TODO)
|
|
- Timeline & Milestones
|
|
- Risks, Ethics & Constraints
|
|
- Success Criteria / Evaluation
|
|
- Open Questions (TODOs)
|
|
|
|
Add one domain-specific block (only if relevant and supported by sources):
|
|
- Product/Software: Key Features; Non-Goals; Rough Architecture; Dependencies & Integration; License.
|
|
- Research/Study: Research Questions; Methodology & Data; Expected Contributions; References/Citations.
|
|
- Policy/Proposal: Policy Mechanism; Legal/Standards; Impact Assessment; Implementation Steps.
|
|
- Art/Exhibition/Performance: Conceptual Frame & References; Medium/Materials; Venue/Spatial Requirements; Tech/AV; Rights/Permissions.
|
|
- Event/Program: Programme Outline / Run-of-Show; Roles & Staffing; Logistics & Venue.
|
|
- Education/Curriculum: Learning Objectives; Syllabus Outline; Assessment & Materials.
|
|
- Media/Film/Publication: Logline & Synopsis; Format; Production Plan; Distribution.
|
|
- Campaign/Nonprofit: Theory of Change; Channels & Tactics; KPIs; Partnerships.
|
|
- Data/ML/Infrastructure: Data Sources; Models; Architecture Diagram (describe); Privacy & Compliance; Ops/Monitoring.
|
|
- Game/Interactive: Core Loop; Mechanics; Narrative; Tech; Monetization (if relevant).
|
|
- Writing/Book/Article: Thesis; Outline/Chapters; Sources; Target Readers.
|
|
|
|
4) Evidence use:
|
|
- Use only facts in Notes/KB. If missing, add short TODOs instead of inventing.
|
|
- Where a claim relies on a specific source, include a short inline blockquote with "Source: <Path or Title>".
|
|
|
|
5) Assets:
|
|
- These files are committed alongside README.md. Embed images with Markdown and link documents where they help clarity.
|
|
|
|
STYLE
|
|
- Short paragraphs and bullets; concrete, specific, and actionable. Avoid marketing fluff.
|
|
- If dates/budget/ownership are uncertain, show ranges or TODOs.
|
|
- Keep a neutral, professional tone adapted to the idea type.
|
|
|
|
TITLE
|
|
- Generate a neutral 2-4 words working title.
|
|
- Begin the document with "# {Title}".
|
|
|
|
Assets Provided:
|
|
{ASSETS}
|
|
|
|
Notes (from user):
|
|
{NOTES}
|
|
|
|
Knowledge Base (source excerpts):
|
|
{KB}
|
|
""".strip()
|
|
|
|
REPHRASE_LENSES = [
|
|
{
|
|
"key": "neutral",
|
|
"label": "Neutral Clarification / Expansion",
|
|
"prompt": """Take the following rough note and turn it into a single clear, concise paragraph that captures the main idea.
|
|
- Keep a neutral, explanatory tone.
|
|
- Don't add new features or speculation, only clarify and connect what is already there.
|
|
- Output exactly one paragraph.
|
|
|
|
Note:
|
|
{USER_NOTE}
|
|
""",
|
|
},
|
|
{
|
|
"key": "problem_solution",
|
|
"label": "Problem-Solution Framing",
|
|
"prompt": """Rewrite the following note as a single paragraph that clearly describes:
|
|
1. What problem or frustration exists,
|
|
2. For whom,
|
|
3. How the idea could solve it in principle.
|
|
Keep it concrete but high-level, no implementation details.
|
|
Output exactly one paragraph.
|
|
|
|
Note:
|
|
{USER_NOTE}
|
|
""",
|
|
},
|
|
{
|
|
"key": "user_story",
|
|
"label": "User Story / Scenario",
|
|
"prompt": """Rewrite the following note as a single paragraph that describes a short scenario from a user's point of view.
|
|
Show how a specific person encounters the situation and how this idea helps them.
|
|
Keep it realistic and simple, not hype-y.
|
|
Output exactly one paragraph.
|
|
|
|
Note:
|
|
{USER_NOTE}
|
|
""",
|
|
},
|
|
{
|
|
"key": "value_prop",
|
|
"label": "Value Proposition / Pitch",
|
|
"prompt": """Rewrite the following note as a single paragraph that sounds like a clear, simple pitch of the idea.
|
|
Explain what it is, who it's for, and why it's valuable or interesting.
|
|
Avoid buzzwords; keep it grounded and concrete.
|
|
Output exactly one paragraph.
|
|
|
|
Note:
|
|
{USER_NOTE}
|
|
""",
|
|
},
|
|
{
|
|
"key": "implementation",
|
|
"label": "Implementation / Next Steps",
|
|
"prompt": """Rewrite the following note as a single paragraph that keeps the original idea but focuses on how one might start implementing or exploring it.
|
|
Mention 2-3 plausible first steps or components without going into deep technical detail.
|
|
Output exactly one paragraph.
|
|
|
|
Note:
|
|
{USER_NOTE}
|
|
""",
|
|
},
|
|
]
|
|
|
|
EXTEND_PROMPT = """
|
|
You are continuing the user's own note. Keep writing in the same language, tone, and formatting style they used.
|
|
|
|
Instructions:
|
|
- Extend the idea with additional possibilities, use cases, angles, or problems to consider.
|
|
- Preserve the author's voice: match their formality, punctuation habits, and quirks (e.g., all lowercase, terse bullets, or formal sentences).
|
|
- Do not summarize or rewrite the original; add new material that flows naturally after it.
|
|
- Keep it concise (2-5 sentences or a few short bullet points).
|
|
- If the input is in bullet form, continue the bullets; otherwise, continue the paragraph.
|
|
|
|
Original note:
|
|
{USER_NOTE}
|
|
""".strip()
|
|
|
|
def build_kb_string(records: List[Record], *, max_chars: int = 80000, per_record_cap: int = 4000) -> str:
|
|
parts: List[str] = []
|
|
budget = max_chars
|
|
for r in records:
|
|
if budget <= 0:
|
|
break
|
|
text = (r.text or "").strip()
|
|
if not text:
|
|
continue
|
|
if len(text) > per_record_cap:
|
|
text = text[:per_record_cap] + "\n...[truncated]"
|
|
title = r.title or (Path(r.source_path).name if r.source_path else r.id)
|
|
header = f"\n---\nSource: {title}\nPath: {r.source_path or ''}\n\n"
|
|
chunk = header + text.strip() + "\n"
|
|
if len(chunk) > budget:
|
|
chunk = chunk[:budget]
|
|
parts.append(chunk)
|
|
budget -= len(chunk)
|
|
return ("\n".join(parts)).strip()
|
|
|
|
|
|
def sanitize_llm_text_simple(s: str) -> str:
|
|
try:
|
|
s = re.sub(r"<think>.*?</think>", "", s, flags=re.S | re.I)
|
|
s = re.sub(r"^\s*```(?:\w+)?\s*", "", s)
|
|
s = re.sub(r"\s*```\s*$", "", s)
|
|
return s.strip()
|
|
except Exception:
|
|
return (s or "").strip()
|
|
|
|
|
|
def md_heading_replace_or_insert(md: str, title: str) -> str:
|
|
if not md:
|
|
return f"# {title}\n\n"
|
|
lines = md.splitlines()
|
|
if lines and re.match(r"^\s*#\s+project\s+concept\s*$", lines[0], flags=re.I):
|
|
lines[0] = f"# {title}"
|
|
return "\n".join(lines)
|
|
if lines and re.match(r"^\s*#\s+", lines[0]):
|
|
return md
|
|
return f"# {title}\n\n" + md
|
|
|
|
|
|
def strip_wrapping_quotes(s: str) -> str:
|
|
s = s.strip()
|
|
s = re.sub(r"^[\"'""'']+", "", s)
|
|
s = re.sub(r"[\"'""'']+$", "", s)
|
|
return s
|
|
|
|
|
|
# -----------------------------
|
|
# Core engine
|
|
# -----------------------------
|
|
|
|
class ConceptEngine:
|
|
def __init__(self, *, status_cb: Optional[Any] = None) -> None:
|
|
self.status_cb = status_cb
|
|
self.files: List[Path] = []
|
|
self.websites: List[str] = []
|
|
self.records: List[Record] = []
|
|
self.file_hashes: Dict[str, str] = {}
|
|
self._seen_hashes: Set[str] = set()
|
|
self._ingesting: Set[str] = set()
|
|
self._base_dir: Path = IDEA_HOLE_DIR
|
|
self._files_dir: Path = self._base_dir / "files"
|
|
self._corpus_file: Path = self._base_dir / "corpus.jsonl"
|
|
self._sessions_file: Path = self._base_dir / "sessions.jsonl"
|
|
self._init_storage()
|
|
|
|
def _status(self, msg: str) -> None:
|
|
if self.status_cb:
|
|
try:
|
|
self.status_cb(msg)
|
|
except Exception:
|
|
pass
|
|
|
|
def _init_storage(self) -> None:
|
|
try:
|
|
self._base_dir.mkdir(parents=True, exist_ok=True)
|
|
self._files_dir.mkdir(parents=True, exist_ok=True)
|
|
if not self._corpus_file.exists():
|
|
self._corpus_file.write_text("", encoding="utf-8")
|
|
if not self._sessions_file.exists():
|
|
self._sessions_file.write_text("", encoding="utf-8")
|
|
self._seen_hashes = set()
|
|
with self._corpus_file.open("r", encoding="utf-8") as fh:
|
|
for line in fh:
|
|
if not line or not line.strip():
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
except Exception:
|
|
continue
|
|
h = obj.get("file_hash")
|
|
if h:
|
|
self._seen_hashes.add(str(h))
|
|
except Exception:
|
|
self._seen_hashes = set()
|
|
|
|
def _compute_file_hash(self, path: Path) -> str:
|
|
h = hashlib.sha256()
|
|
try:
|
|
with path.open("rb") as fh:
|
|
while True:
|
|
b = fh.read(1024 * 1024)
|
|
if not b:
|
|
break
|
|
h.update(b)
|
|
except Exception:
|
|
st = None
|
|
try:
|
|
st = path.stat()
|
|
except Exception:
|
|
pass
|
|
h.update((str(path) + "|" + str(getattr(st, "st_mtime", 0.0))).encode("utf-8", "ignore"))
|
|
return h.hexdigest()
|
|
|
|
def _compute_url_hash(self, url: str) -> str:
|
|
try:
|
|
return hashlib.sha256(url.strip().encode("utf-8", "ignore")).hexdigest()
|
|
except Exception:
|
|
return hashlib.sha256(url.encode("utf-8", "ignore")).hexdigest()
|
|
|
|
def _ensure_file_symlink(self, src: Path, file_hash: str) -> Path:
|
|
dst = self._files_dir / f"{file_hash}__{src.name}"
|
|
try:
|
|
if not dst.exists():
|
|
copy_or_link(src, self._files_dir)
|
|
except Exception:
|
|
pass
|
|
return dst
|
|
|
|
def _ingest_single_file(self, src: Path, file_hash: str, *, verbose: bool = False) -> bool:
|
|
try:
|
|
tmp_dir = self._base_dir / "ingest_tmp" / file_hash
|
|
try:
|
|
if tmp_dir.exists():
|
|
shutil.rmtree(tmp_dir)
|
|
except Exception:
|
|
pass
|
|
tmp_dir.mkdir(parents=True, exist_ok=True)
|
|
copy_or_link(src, tmp_dir)
|
|
|
|
external = None
|
|
script = REPO_ROOT / "corpus_builder.py"
|
|
if script.exists():
|
|
external = ExternalCorpusBuilder(script)
|
|
|
|
tmp_out = tmp_dir / "out.jsonl"
|
|
ok = False
|
|
if external is not None:
|
|
self._status(f"Indexing {src.name} (external)...")
|
|
ok = external.build(tmp_dir, tmp_out, workers=2, verbose=verbose)
|
|
if not ok:
|
|
self._status(f"Indexing {src.name} (simple)...")
|
|
try:
|
|
simple = SimpleCorpusBuilder()
|
|
recs = simple.build(tmp_dir, tmp_out)
|
|
ok = bool(recs)
|
|
except Exception:
|
|
ok = False
|
|
|
|
if ok and tmp_out.exists():
|
|
ts = int(time.time())
|
|
with tmp_out.open("r", encoding="utf-8") as fh_in, self._corpus_file.open("a", encoding="utf-8") as fh_out:
|
|
for line in fh_in:
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
except Exception:
|
|
continue
|
|
obj["file_hash"] = file_hash
|
|
obj["source_path"] = str(src.resolve())
|
|
obj.setdefault("mime", obj.get("mime") or None)
|
|
obj["added_at"] = ts
|
|
try:
|
|
fh_out.write(json.dumps(obj, ensure_ascii=False) + "\n")
|
|
except Exception:
|
|
fh_out.write(json.dumps(obj) + "\n")
|
|
self._seen_hashes.add(file_hash)
|
|
return True
|
|
return False
|
|
finally:
|
|
try:
|
|
shutil.rmtree(self._base_dir / "ingest_tmp" / file_hash)
|
|
except Exception:
|
|
pass
|
|
|
|
def _ingest_single_url(self, url: str, url_hash: str) -> bool:
|
|
try:
|
|
self._status(f"Fetching {url}...")
|
|
try:
|
|
html_text, _hdrs = websearch._http_get(url, timeout=25)
|
|
except Exception:
|
|
return False
|
|
text = websearch._extract_text(html_text)
|
|
if not text.strip():
|
|
return False
|
|
title = self._friendly_url_name(url)
|
|
try:
|
|
m = re.search(r"<title>(.*?)</title>", html_text, flags=re.I | re.S)
|
|
if m:
|
|
raw_title = m.group(1)
|
|
cleaned = re.sub(r"\s+", " ", raw_title)
|
|
try:
|
|
cleaned = html.unescape(cleaned)
|
|
except Exception:
|
|
pass
|
|
cleaned = cleaned.strip()
|
|
if cleaned:
|
|
title = cleaned
|
|
except Exception:
|
|
pass
|
|
|
|
ts = int(time.time())
|
|
obj = {
|
|
"id": url,
|
|
"title": title,
|
|
"text": text,
|
|
"source_path": url,
|
|
"mime": "text/html",
|
|
"file_hash": url_hash,
|
|
"added_at": ts,
|
|
}
|
|
with self._corpus_file.open("a", encoding="utf-8") as fh_out:
|
|
fh_out.write(json.dumps(obj, ensure_ascii=False) + "\n")
|
|
self._seen_hashes.add(url_hash)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
@staticmethod
|
|
def _friendly_url_name(url: str) -> str:
|
|
try:
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(url)
|
|
host = parsed.netloc or url
|
|
path = (parsed.path or "").strip("/").split("/")
|
|
if path and path[0]:
|
|
first = path[0][:40]
|
|
return f"{host}/{first}"
|
|
return host
|
|
except Exception:
|
|
return url
|
|
|
|
def _ensure_corpus_for_files(self, paths: List[Path]) -> None:
|
|
if not paths:
|
|
return
|
|
to_ingest: List[Tuple[Path, str]] = []
|
|
for p in paths:
|
|
try:
|
|
h = self._compute_file_hash(p)
|
|
except Exception:
|
|
continue
|
|
self.file_hashes[str(p)] = h
|
|
self._ensure_file_symlink(p, h)
|
|
if h not in self._seen_hashes and h not in self._ingesting:
|
|
to_ingest.append((p, h))
|
|
|
|
if not to_ingest:
|
|
return
|
|
|
|
for src, h in to_ingest:
|
|
self._ingesting.add(h)
|
|
try:
|
|
self._ingest_single_file(src, h, verbose=False)
|
|
finally:
|
|
try:
|
|
self._ingesting.remove(h)
|
|
except Exception:
|
|
pass
|
|
|
|
def _ensure_corpus_for_urls(self, urls: List[str]) -> None:
|
|
if not urls:
|
|
return
|
|
to_ingest: List[Tuple[str, str]] = []
|
|
for u in urls:
|
|
if not u:
|
|
continue
|
|
h = self._compute_url_hash(u)
|
|
self.file_hashes[u] = h
|
|
if h not in self._seen_hashes and h not in self._ingesting:
|
|
to_ingest.append((u, h))
|
|
|
|
if not to_ingest:
|
|
return
|
|
|
|
for url, h in to_ingest:
|
|
self._ingesting.add(h)
|
|
try:
|
|
self._ingest_single_url(url, h)
|
|
finally:
|
|
try:
|
|
self._ingesting.remove(h)
|
|
except Exception:
|
|
pass
|
|
|
|
def _load_records_for_hashes(self, hashes: Set[str]) -> List[Record]:
|
|
out: List[Record] = []
|
|
if not hashes:
|
|
return out
|
|
try:
|
|
with self._corpus_file.open("r", encoding="utf-8") as fh:
|
|
for line in fh:
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
except Exception:
|
|
continue
|
|
if str(obj.get("file_hash") or "") not in hashes:
|
|
continue
|
|
out.append(Record(
|
|
id=str(obj.get("id", "")),
|
|
title=str(obj.get("title", "")),
|
|
text=str(obj.get("text", "")),
|
|
source_path=str(obj.get("source_path", "")) if obj.get("source_path") else None,
|
|
mime=str(obj.get("mime", "")) if obj.get("mime") else None,
|
|
))
|
|
except Exception:
|
|
pass
|
|
return out
|
|
|
|
def build_kb_records(self, files: List[str], websites: List[str]) -> List[Record]:
|
|
paths = [Path(p) for p in files]
|
|
self._ensure_corpus_for_files(paths)
|
|
self._ensure_corpus_for_urls(websites)
|
|
hashes = {self.file_hashes.get(str(p)) for p in paths}
|
|
hashes.update({self.file_hashes.get(u) for u in websites})
|
|
hashes = {h for h in hashes if h}
|
|
self.records = self._load_records_for_hashes(hashes)
|
|
return self.records
|
|
|
|
# --- Sessions
|
|
def _load_all_sessions(self) -> List[Dict[str, Any]]:
|
|
entries: List[Dict[str, Any]] = []
|
|
try:
|
|
with self._sessions_file.open("r", encoding="utf-8") as fh:
|
|
for line in fh:
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
if isinstance(obj, dict) and obj.get("title"):
|
|
entries.append(obj)
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
return entries
|
|
|
|
def _write_all_sessions(self, entries: List[Dict[str, Any]]) -> None:
|
|
tmp = self._sessions_file.with_suffix(".tmp")
|
|
try:
|
|
with tmp.open("w", encoding="utf-8") as fh:
|
|
for obj in entries:
|
|
fh.write(json.dumps(obj, ensure_ascii=False) + "\n")
|
|
tmp.replace(self._sessions_file)
|
|
except Exception:
|
|
with self._sessions_file.open("w", encoding="utf-8") as fh:
|
|
for obj in entries:
|
|
fh.write(json.dumps(obj, ensure_ascii=False) + "\n")
|
|
|
|
def _session_title_exists(self, title: str) -> bool:
|
|
t = (title or "").strip()
|
|
if not t:
|
|
return False
|
|
try:
|
|
with self._sessions_file.open("r", encoding="utf-8") as fh:
|
|
for line in fh:
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
if isinstance(obj, dict) and (obj.get("title") or "").strip() == t:
|
|
return True
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
def list_sessions(self) -> List[Dict[str, Any]]:
|
|
out = []
|
|
for e in self._load_all_sessions():
|
|
out.append({
|
|
"title": e.get("title") or "",
|
|
"description": e.get("description") or "",
|
|
"saved_at": e.get("saved_at") or 0,
|
|
})
|
|
return out
|
|
|
|
def load_session(self, title: str) -> Optional[Dict[str, Any]]:
|
|
t = (title or "").strip()
|
|
if not t:
|
|
return None
|
|
for e in self._load_all_sessions():
|
|
if (e.get("title") or "").strip() == t:
|
|
return e
|
|
return None
|
|
|
|
def save_session(self, payload: Dict[str, Any], *, allow_overwrite: bool) -> Dict[str, Any]:
|
|
title = (payload.get("title") or "").strip()
|
|
if not title:
|
|
raise RuntimeError("Title is required to save a session.")
|
|
exists = self._session_title_exists(title)
|
|
if exists and not allow_overwrite:
|
|
raise RuntimeError("Session already exists")
|
|
|
|
files_list = payload.get("files") or []
|
|
websites_list = payload.get("websites") or []
|
|
|
|
self._ensure_corpus_for_files([Path(f["path"]) for f in files_list if f.get("path")])
|
|
self._ensure_corpus_for_urls([w.get("url") for w in websites_list if w.get("url")])
|
|
|
|
files_meta = []
|
|
for f in files_list:
|
|
path = f.get("path")
|
|
if not path:
|
|
continue
|
|
h = self.file_hashes.get(path) or self._compute_file_hash(Path(path))
|
|
self.file_hashes[path] = h
|
|
files_meta.append({
|
|
"path": path,
|
|
"file_hash": h,
|
|
"include": bool(f.get("include", True)),
|
|
})
|
|
websites_meta = []
|
|
for w in websites_list:
|
|
url = w.get("url")
|
|
if not url:
|
|
continue
|
|
h = self.file_hashes.get(url) or self._compute_url_hash(url)
|
|
self.file_hashes[url] = h
|
|
websites_meta.append({
|
|
"url": url,
|
|
"file_hash": h,
|
|
"include": bool(w.get("include", True)),
|
|
})
|
|
|
|
record = {
|
|
"title": title,
|
|
"description": (payload.get("description") or "").strip(),
|
|
"notes": (payload.get("notes") or "").strip(),
|
|
"concept": (payload.get("concept") or "").strip(),
|
|
"files": files_meta,
|
|
"websites": websites_meta,
|
|
"saved_at": int(time.time()),
|
|
"rephrase_variants": payload.get("rephrase_variants") or [],
|
|
"rephrase_selected_key": payload.get("rephrase_selected_key"),
|
|
}
|
|
|
|
entries = self._load_all_sessions()
|
|
if exists:
|
|
entries = [e for e in entries if (e.get("title") or "") != title]
|
|
entries.append(record)
|
|
self._write_all_sessions(entries)
|
|
return record
|
|
|
|
|
|
# -----------------------------
|
|
# Concept generation helpers
|
|
# -----------------------------
|
|
|
|
def _extract_title_desc(concept_md: str, *, client: OllamaClient, model: str) -> Tuple[Optional[str], Optional[str]]:
|
|
try:
|
|
prompt = (
|
|
"Extract a concise title and a one-sentence description from the following concept.\n"
|
|
"- Title: <= 50 chars (3-5 words).\n- Description: <= 120 chars, (one sentence) no trailing period.\n"
|
|
"Return ONLY strict JSON with keys 'title' and 'description'.\n\nCONCEPT:\n" + concept_md
|
|
)
|
|
raw = client.generate(model, prompt)
|
|
obj = _parse_json_strict(raw) or {}
|
|
title = strip_wrapping_quotes(str(obj.get("title") or "").strip()) or None
|
|
desc = strip_wrapping_quotes(str(obj.get("description") or "").strip()) or None
|
|
return title, desc
|
|
except Exception:
|
|
return None, None
|
|
|
|
|
|
# -----------------------------
|
|
# PDF conversion helpers
|
|
# -----------------------------
|
|
|
|
|
|
def _slug(s: str) -> str:
|
|
s = re.sub(r"[\s]+", "-", s.strip())
|
|
s = re.sub(r"[^a-zA-Z0-9._-]", "-", s)
|
|
return re.sub(r"-+", "-", s).strip("-_")
|
|
|
|
|
|
def _convert_markdown_to_pdf(md_file: Path, out_pdf: Path) -> Tuple[bool, Optional[Path]]:
|
|
concept_dir = out_pdf.parent
|
|
concept_dir.mkdir(parents=True, exist_ok=True)
|
|
logs_dir = IDEA_HOLE_DIR / "logs"
|
|
logs_dir.mkdir(parents=True, exist_ok=True)
|
|
log_path = logs_dir / f"pdf_export_{concept_dir.name}.log"
|
|
|
|
pandoc = resolve_command("pandoc")
|
|
tectonic = resolve_command("tectonic")
|
|
|
|
lines: List[str] = []
|
|
lines.append(f"PATH={os.environ.get('PATH','')}")
|
|
lines.append(f"md_file={md_file}")
|
|
lines.append(f"resolved pandoc={pandoc}")
|
|
lines.append(f"resolved tectonic={tectonic}")
|
|
|
|
if not pandoc or not tectonic:
|
|
lines.append("Missing required tools: pandoc and/or tectonic.")
|
|
try:
|
|
log_path.write_text("\n".join(lines), encoding="utf-8")
|
|
except Exception:
|
|
pass
|
|
return False, log_path
|
|
|
|
tmp_base = IDEA_HOLE_DIR / "tmp_pdf" / concept_dir.name
|
|
try:
|
|
if tmp_base.exists():
|
|
shutil.rmtree(tmp_base)
|
|
except Exception:
|
|
pass
|
|
tmp_base.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
text = md_file.read_text(encoding="utf-8")
|
|
except Exception as e:
|
|
lines.append(f"read error: {e}")
|
|
try:
|
|
log_path.write_text("\n".join(lines), encoding="utf-8")
|
|
except Exception:
|
|
pass
|
|
return False, log_path
|
|
|
|
img_rgx = re.compile(r"!\[[^\]]*\]\(([^\s)]+)(?:\s+\"[^\"]*\")?\)")
|
|
allowed_ext = {".png", ".jpg", ".jpeg", ".pdf", ".eps"}
|
|
|
|
def ensure_image_available(src: str) -> str:
|
|
p = Path(src)
|
|
if not p.is_absolute():
|
|
p = (concept_dir / p).resolve()
|
|
if not p.exists():
|
|
alt = (concept_dir / Path(src).name).resolve()
|
|
if alt.exists():
|
|
p = alt
|
|
else:
|
|
lines.append(f"missing image: {src}")
|
|
return src
|
|
ext = p.suffix.lower()
|
|
if ext in allowed_ext:
|
|
out_name = p.name
|
|
out_path = tmp_base / out_name
|
|
try:
|
|
if not out_path.exists():
|
|
shutil.copy2(str(p), str(out_path))
|
|
return out_name
|
|
except Exception as e:
|
|
lines.append(f"copy fail: {src} -> {out_name} ({e})")
|
|
return src
|
|
if ext == ".svg":
|
|
out_name = p.stem + ".png"
|
|
out_path = tmp_base / out_name
|
|
try:
|
|
from cairosvg import svg2png # type: ignore
|
|
svg2png(url=str(p), write_to=str(out_path))
|
|
return out_name
|
|
except Exception as e_svg_py:
|
|
lines.append(f"cairosvg unavailable or failed: {e_svg_py}")
|
|
try:
|
|
tool = resolve_command("rsvg-convert")
|
|
if tool:
|
|
res = subprocess.run([tool, "-f", "png", "-o", str(out_path), str(p)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=subprocess_env())
|
|
if res.returncode == 0 and out_path.exists():
|
|
return out_name
|
|
lines.append(f"rsvg-convert failed: exit {res.returncode}, {res.stdout}")
|
|
except Exception as e_svg_cli:
|
|
lines.append(f"rsvg-convert error: {e_svg_cli}")
|
|
try:
|
|
tool = resolve_command("magick") or resolve_command("convert")
|
|
if tool:
|
|
res = subprocess.run([tool, str(p), str(out_path)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=subprocess_env())
|
|
if res.returncode == 0 and out_path.exists():
|
|
return out_name
|
|
lines.append(f"imagemagick failed: exit {res.returncode}, {res.stdout}")
|
|
except Exception as e_im:
|
|
lines.append(f"imagemagick error: {e_im}")
|
|
|
|
try:
|
|
from PIL import Image # type: ignore
|
|
img = Image.open(str(p))
|
|
try:
|
|
img.seek(0)
|
|
except Exception:
|
|
pass
|
|
out_name = p.stem + ".png"
|
|
out_path = tmp_base / out_name
|
|
img.convert("RGBA" if img.mode in ("P", "LA") else "RGB").save(str(out_path), format="PNG")
|
|
return out_name
|
|
except Exception as e:
|
|
lines.append(f"convert fail: {src} -> png ({e})")
|
|
out_name = p.name
|
|
out_path = tmp_base / out_name
|
|
try:
|
|
shutil.copy2(str(p), str(out_path))
|
|
return out_name
|
|
except Exception as e2:
|
|
lines.append(f"final copy fail: {src} ({e2})")
|
|
return src
|
|
|
|
def _repl(m: re.Match) -> str:
|
|
orig = m.group(0)
|
|
path = m.group(1)
|
|
rep = ensure_image_available(path)
|
|
return orig.replace(path, rep)
|
|
|
|
mod_text = img_rgx.sub(_repl, text)
|
|
|
|
def _preserve_extra_blank_lines(s: str) -> str:
|
|
s = s.replace("\r\n", "\n").replace("\r", "\n")
|
|
lines_in = s.split("\n")
|
|
out_lines: List[str] = []
|
|
in_fence = False
|
|
blank_run = 0
|
|
for ln in lines_in:
|
|
stripped = ln.lstrip()
|
|
if stripped.startswith("```") or stripped.startswith("~~~"):
|
|
if blank_run > 0:
|
|
out_lines.append("")
|
|
for _ in range(blank_run - 1):
|
|
out_lines.append("\\vspace{1em}")
|
|
blank_run = 0
|
|
out_lines.append(ln)
|
|
in_fence = not in_fence
|
|
continue
|
|
if in_fence:
|
|
if blank_run > 0:
|
|
out_lines.append("")
|
|
for _ in range(blank_run - 1):
|
|
out_lines.append("\\vspace{1em}")
|
|
blank_run = 0
|
|
out_lines.append(ln)
|
|
continue
|
|
if stripped == "":
|
|
blank_run += 1
|
|
continue
|
|
if blank_run > 0:
|
|
out_lines.append("")
|
|
for _ in range(blank_run - 1):
|
|
out_lines.append("\\vspace{1em}")
|
|
blank_run = 0
|
|
out_lines.append(ln)
|
|
if blank_run > 0:
|
|
out_lines.append("")
|
|
for _ in range(blank_run - 1):
|
|
out_lines.append("\\vspace{1em}")
|
|
return "\n".join(out_lines)
|
|
|
|
mod_text = _preserve_extra_blank_lines(mod_text)
|
|
tmp_md = tmp_base / "README_pdf.md"
|
|
tmp_md.write_text(mod_text, encoding="utf-8")
|
|
|
|
cmd = [
|
|
pandoc,
|
|
str(tmp_md),
|
|
"-f", "markdown+hard_line_breaks+raw_tex",
|
|
"-s",
|
|
f"--pdf-engine={tectonic}",
|
|
"-V", "mainfont=Helvetica",
|
|
"-V", "monofont=Menlo",
|
|
"-V", "geometry:margin=20mm",
|
|
"-V", "fontsize=11pt",
|
|
"--resource-path", f"{str(tmp_base)}:{str(concept_dir)}",
|
|
"-o", str(out_pdf),
|
|
]
|
|
res = subprocess.run(cmd, cwd=str(tmp_base), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=subprocess_env())
|
|
lines.append("$ " + " ".join(cmd))
|
|
lines.append(f"(exit {res.returncode})")
|
|
lines.append(res.stdout or "")
|
|
ok = (res.returncode == 0 and out_pdf.exists())
|
|
if not ok:
|
|
try:
|
|
cmd_fallback = [
|
|
pandoc,
|
|
str(tmp_md),
|
|
"-f", "markdown+hard_line_breaks",
|
|
"-s",
|
|
f"--pdf-engine={tectonic}",
|
|
"-V", "mainfont=Helvetica",
|
|
"-V", "monofont=Menlo",
|
|
"-V", "geometry:margin=20mm",
|
|
"-V", "fontsize=11pt",
|
|
"--resource-path", f"{str(tmp_base)}:{str(concept_dir)}",
|
|
"-o", str(out_pdf),
|
|
]
|
|
res2 = subprocess.run(cmd_fallback, cwd=str(tmp_base), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=subprocess_env())
|
|
lines.append("$ " + " ".join(cmd_fallback))
|
|
lines.append(f"(exit {res2.returncode})")
|
|
lines.append(res2.stdout or "")
|
|
ok = (res2.returncode == 0 and out_pdf.exists())
|
|
except Exception as e_fallback:
|
|
lines.append(f"fallback error: {e_fallback}")
|
|
|
|
if not ok:
|
|
try:
|
|
log_path.write_text("\n".join(lines), encoding="utf-8")
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
shutil.rmtree(tmp_base)
|
|
except Exception:
|
|
pass
|
|
|
|
return ok, log_path
|
|
|
|
|
|
# -----------------------------
|
|
# Settings
|
|
# -----------------------------
|
|
|
|
def settings_path() -> Path:
|
|
IDEA_HOLE_DIR.mkdir(parents=True, exist_ok=True)
|
|
return IDEA_HOLE_DIR / "settings.json"
|
|
|
|
|
|
def load_settings() -> Dict[str, str]:
|
|
p = settings_path()
|
|
if not p.exists():
|
|
return {}
|
|
try:
|
|
obj = json.loads(p.read_text(encoding="utf-8"))
|
|
if isinstance(obj, dict):
|
|
return {k: str(v) for k, v in obj.items() if v is not None}
|
|
except Exception:
|
|
return {}
|
|
return {}
|
|
|
|
|
|
def save_settings(settings: Dict[str, str]) -> None:
|
|
p = settings_path()
|
|
try:
|
|
p.write_text(json.dumps(settings, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
except Exception:
|
|
p.write_text(json.dumps(settings), encoding="utf-8")
|
|
|
|
|
|
# -----------------------------
|
|
# Actions
|
|
# -----------------------------
|
|
|
|
def list_models() -> List[str]:
|
|
try:
|
|
ollama = resolve_command("ollama")
|
|
if not ollama:
|
|
return []
|
|
res = subprocess.run([ollama, "list"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, timeout=8, env=subprocess_env())
|
|
if res.returncode != 0:
|
|
return []
|
|
lines = [ln.strip() for ln in (res.stdout or "").splitlines()]
|
|
out: List[str] = []
|
|
for ln in lines:
|
|
if not ln or ln.lower().startswith("name"):
|
|
continue
|
|
name = ln.split()[0]
|
|
if name and name not in out:
|
|
out.append(name)
|
|
return out
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def stat_paths(paths: List[str], *, expand_dirs: bool) -> List[Dict[str, Any]]:
|
|
out: List[Dict[str, Any]] = []
|
|
seen: Set[str] = set()
|
|
for raw in paths:
|
|
if not raw:
|
|
continue
|
|
p = Path(raw)
|
|
if p.is_dir() and expand_dirs:
|
|
for q in p.rglob("*"):
|
|
if not q.is_file():
|
|
continue
|
|
if str(q) in seen:
|
|
continue
|
|
seen.add(str(q))
|
|
out.append({
|
|
"name": q.name,
|
|
"path": str(q),
|
|
"type": q.suffix.lower() or "file",
|
|
"size": human_size(q.stat().st_size) if q.exists() else "?",
|
|
})
|
|
elif p.is_file():
|
|
if str(p) in seen:
|
|
continue
|
|
seen.add(str(p))
|
|
out.append({
|
|
"name": p.name,
|
|
"path": str(p),
|
|
"type": p.suffix.lower() or "file",
|
|
"size": human_size(p.stat().st_size) if p.exists() else "?",
|
|
})
|
|
return out
|
|
|
|
|
|
def rephrase(note: str, host: str, model: str) -> List[Dict[str, str]]:
|
|
client = OllamaClient(host=host)
|
|
variants: List[Dict[str, str]] = [{
|
|
"key": "original",
|
|
"label": "Original Note",
|
|
"text": note,
|
|
}]
|
|
for idx, lens in enumerate(REPHRASE_LENSES, start=1):
|
|
prompt = (lens.get("prompt") or "").replace("{USER_NOTE}", note)
|
|
raw = client.generate(model=model, prompt=prompt)
|
|
text = sanitize_llm_text_simple(raw)
|
|
variants.append({
|
|
"key": lens.get("key") or f"lens_{idx}",
|
|
"label": lens.get("label") or f"Variant {idx}",
|
|
"text": text,
|
|
})
|
|
return variants
|
|
|
|
|
|
def extend(note: str, host: str, model: str) -> str:
|
|
client = OllamaClient(host=host)
|
|
prompt = EXTEND_PROMPT.replace("{USER_NOTE}", note)
|
|
raw = client.generate(model=model, prompt=prompt)
|
|
text = sanitize_llm_text_simple(raw)
|
|
if not text.strip():
|
|
raise RuntimeError("Empty response from model")
|
|
return text
|
|
|
|
|
|
def generate_concept(payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
notes = (payload.get("notes") or "").strip()
|
|
files = payload.get("files") or []
|
|
websites = payload.get("websites") or []
|
|
host = payload.get("ollama_host") or "http://localhost:11434"
|
|
model = payload.get("model") or ""
|
|
|
|
engine = ConceptEngine()
|
|
records = engine.build_kb_records(files, websites)
|
|
kb = build_kb_string(records)
|
|
|
|
assets_lines: List[str] = []
|
|
if files:
|
|
assets_lines.append("Files:")
|
|
assets_lines.extend(f"- {Path(p).name}" for p in files)
|
|
if websites:
|
|
assets_lines.append("URLs:")
|
|
assets_lines.extend(f"- {u}" for u in websites)
|
|
assets_str = "\n".join(assets_lines) or "(none)"
|
|
|
|
prompt = (
|
|
PROMPT_TEMPLATE
|
|
.replace("{NOTES}", notes or "(none)")
|
|
.replace("{KB}", kb or "(empty)")
|
|
.replace("{ASSETS}", assets_str)
|
|
)
|
|
|
|
client = OllamaClient(host=host)
|
|
concept_md = client.generate(model=model, prompt=prompt)
|
|
concept_md = sanitize_llm_text_simple(concept_md)
|
|
title, desc = _extract_title_desc(concept_md, client=client, model=model)
|
|
if not desc:
|
|
desc = ""
|
|
if title:
|
|
concept_md = md_heading_replace_or_insert(concept_md, title)
|
|
|
|
if not concept_md.strip():
|
|
raise RuntimeError("Empty response from model")
|
|
|
|
return {
|
|
"concept": concept_md,
|
|
"title": title or "",
|
|
"description": strip_wrapping_quotes(desc)[:120],
|
|
"kb_records": len(records),
|
|
}
|
|
|
|
|
|
def prior_art(payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
notes = (payload.get("notes") or "").strip()
|
|
if not notes:
|
|
notes = "\n\n".join(
|
|
part
|
|
for part in (
|
|
f"Title: {(payload.get('title') or '').strip()}" if (payload.get("title") or "").strip() else "",
|
|
f"Description: {(payload.get('description') or '').strip()}" if (payload.get("description") or "").strip() else "",
|
|
(payload.get("concept") or "").strip(),
|
|
)
|
|
if part
|
|
)
|
|
files = payload.get("files") or []
|
|
websites = payload.get("websites") or []
|
|
host = payload.get("ollama_host") or "http://localhost:11434"
|
|
model = payload.get("model") or ""
|
|
searx_url = payload.get("searx_url") or None
|
|
|
|
engine = ConceptEngine()
|
|
records = engine.build_kb_records(files, websites)
|
|
kb = build_kb_string(records)
|
|
|
|
return websearch.prior_art_search(
|
|
ollama_host=host,
|
|
model=model,
|
|
notes=notes,
|
|
kb=kb,
|
|
assets=files,
|
|
searx_url=searx_url,
|
|
)
|
|
|
|
|
|
def preview_pdf(payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
concept_text = (payload.get("concept") or "").strip()
|
|
title = (payload.get("title") or "").strip()
|
|
files = payload.get("files") or []
|
|
output_path = (payload.get("output_path") or "").strip()
|
|
if not concept_text:
|
|
raise RuntimeError("Concept text is empty")
|
|
|
|
slug = _slug(title or "preview")
|
|
base = IDEA_HOLE_DIR / "preview" / f"{slug}-preview"
|
|
try:
|
|
if base.exists():
|
|
shutil.rmtree(base)
|
|
except Exception:
|
|
pass
|
|
base.mkdir(parents=True, exist_ok=True)
|
|
|
|
md_path = base / "README.md"
|
|
md_path.write_text(concept_text, encoding="utf-8")
|
|
|
|
assets = [Path(p) for p in files]
|
|
for src in assets:
|
|
try:
|
|
dst = base / src.name
|
|
if dst.name.lower() in {"readme.md", f"{slug}-concept.pdf".lower(), f"{slug}-preview.pdf".lower()}:
|
|
dst = base / f"asset-{src.name}"
|
|
shutil.copy2(src, dst)
|
|
except Exception:
|
|
pass
|
|
|
|
if output_path:
|
|
pdf_path = Path(output_path).expanduser()
|
|
if pdf_path.exists() and pdf_path.is_dir():
|
|
raise RuntimeError(f"Output path is a directory: {pdf_path}")
|
|
if pdf_path.suffix.lower() != ".pdf":
|
|
pdf_path = Path(f"{pdf_path}.pdf")
|
|
else:
|
|
pdf_path = base / f"{slug}-preview.pdf"
|
|
|
|
ok, log_path = _convert_markdown_to_pdf(md_path, pdf_path)
|
|
return {
|
|
"ok": ok,
|
|
"pdf_path": str(pdf_path),
|
|
"log_path": str(log_path) if log_path else "",
|
|
}
|
|
|
|
|
|
# -----------------------------
|
|
# JSON-RPC style entrypoint
|
|
# -----------------------------
|
|
|
|
def _read_stdin_json() -> Dict[str, Any]:
|
|
raw = sys.stdin.read()
|
|
if not raw:
|
|
return {}
|
|
return json.loads(raw)
|
|
|
|
|
|
def main() -> int:
|
|
try:
|
|
req = _read_stdin_json()
|
|
action = req.get("action")
|
|
payload = req.get("payload") or {}
|
|
|
|
if not action:
|
|
raise RuntimeError("Missing action")
|
|
|
|
if action == "list_models":
|
|
result = list_models()
|
|
elif action == "stat_paths":
|
|
result = stat_paths(payload.get("paths") or [], expand_dirs=bool(payload.get("expand_dirs")))
|
|
elif action == "rephrase":
|
|
result = rephrase(payload.get("note") or "", payload.get("ollama_host") or "http://localhost:11434", payload.get("model") or "")
|
|
elif action == "extend":
|
|
result = extend(payload.get("note") or "", payload.get("ollama_host") or "http://localhost:11434", payload.get("model") or "")
|
|
elif action == "generate_concept":
|
|
result = generate_concept(payload)
|
|
elif action == "prior_art":
|
|
result = prior_art(payload)
|
|
elif action == "preview_pdf":
|
|
result = preview_pdf(payload)
|
|
elif action == "load_settings":
|
|
result = load_settings()
|
|
elif action == "save_settings":
|
|
save_settings(payload.get("settings") or {})
|
|
result = {"ok": True}
|
|
elif action == "list_sessions":
|
|
engine = ConceptEngine()
|
|
result = engine.list_sessions()
|
|
elif action == "load_session":
|
|
engine = ConceptEngine()
|
|
result = engine.load_session(payload.get("title") or "")
|
|
elif action == "save_session":
|
|
engine = ConceptEngine()
|
|
result = engine.save_session(payload.get("payload") or {}, allow_overwrite=bool(payload.get("allow_overwrite")))
|
|
else:
|
|
raise RuntimeError(f"Unknown action: {action}")
|
|
|
|
out = {"ok": True, "data": result}
|
|
except Exception as e:
|
|
out = {
|
|
"ok": False,
|
|
"error": str(e),
|
|
"trace": traceback.format_exc(limit=6),
|
|
}
|
|
sys.stdout.write(json.dumps(out, ensure_ascii=False))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|