Files
concept-maker/concept-maker_gui.py
Victor Giers 3b6b5c78c7 auto-git:
[change] concept-maker_gui.py
2025-11-30 14:43:09 +01:00

2991 lines
122 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Idea → Concept GUI
What this provides
- Drag/drop or add files and folders into a table
- Freeform notes box for thoughts and fragments
- Builds a JSONL knowledge base using corpus_builder.py if present (fallback included)
- Generates a polished "Concept" using a local Ollama model
- Lets you edit, then save into a local Git repo and push to a remote
Notes
- Requires a local Ollama daemon (default http://localhost:11434)
- Default model is the one you requested: "mistral3.2-small:24b" (editable in UI)
Dependencies
- Standard library only for HTTP (urllib); requests is NOT required.
- Optional: tkinterdnd2 for native drag+drop (falls back to buttons if missing)
Run
python3 concept_gui.py
"""
from __future__ import annotations
import os
import sys
import json
import time
import math
import shutil
import threading
import traceback
import hashlib
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import List, Dict, Optional, Set, Any
# --- GUI imports
import tkinter as tk
from tkinter import ttk, filedialog, messagebox, simpledialog
from tkinter.scrolledtext import ScrolledText
# Optional native drag/drop support
try:
from tkinterdnd2 import DND_FILES, TkinterDnD # type: ignore
_TKDND_AVAILABLE = True
except Exception:
TkinterDnD = tk # type: ignore
DND_FILES = None # type: ignore
_TKDND_AVAILABLE = False
import subprocess
import tempfile
import re
import html
# --- HTTP (stdlib)
import urllib.request
import urllib.error
from urllib.parse import urlparse
import webbrowser
import websearch
# -----------------------------
# Simple utilities
# -----------------------------
def human_size(n: int) -> str:
if n <= 0:
return "0 B"
units = ["B", "KB", "MB", "GB", "TB"]
k = 1024.0
i = int(math.floor(math.log(n, k)))
i = max(0, min(i, len(units) - 1))
return f"{n / (k**i):.1f} {units[i]}"
def safe_symlink(src: Path, dst: Path) -> bool:
try:
if dst.exists() or dst.is_symlink():
dst.unlink()
os.symlink(src, dst)
return True
except Exception:
return False
def copy_or_link(src: Path, dst_dir: Path) -> Path:
dst_dir.mkdir(parents=True, exist_ok=True)
dst = dst_dir / src.name
if safe_symlink(src, dst):
return dst
# fallback to copy
shutil.copy2(src, dst)
return dst
def read_text_guess(path: Path) -> str:
try:
b = path.read_bytes()
for enc in ("utf-8", "utf-16", "latin-1"):
try:
return b.decode(enc)
except Exception:
pass
return b.decode("utf-8", errors="ignore")
except Exception:
return ""
# -----------------------------
# Corpus building
# -----------------------------
@dataclass
class Record:
id: str
title: str
text: str
source_path: Optional[str] = None
mime: Optional[str] = None
class SimpleCorpusBuilder:
"""Very lightweight fallback if corpus_builder.py or deps are unavailable.
Supports: txt, md, rst, html (strip tags naively), pdf (if PyMuPDF installed).
"""
def __init__(self) -> None:
self._fitz = None
try:
import fitz # PyMuPDF
self._fitz = fitz
except Exception:
self._fitz = None
def build(self, root: Path, out_jsonl: Path) -> List[Record]:
out_jsonl.parent.mkdir(parents=True, exist_ok=True)
records: List[Record] = []
for p in root.rglob("*"):
if not p.is_file():
continue
suf = p.suffix.lower()
try:
if suf in {".txt", ".md", ".rst"}:
text = read_text_guess(p)
if text.strip():
records.append(Record(id=str(p), title=p.stem, text=text, source_path=str(p)))
elif suf in {".html", ".htm"}:
raw = read_text_guess(p)
text = self._strip_html(raw)
if text.strip():
records.append(Record(id=str(p), title=p.stem, text=text, source_path=str(p)))
elif suf == ".pdf" and self._fitz is not None:
text = self._pdf_text(p)
if text.strip():
records.append(Record(id=str(p), title=p.stem, text=text, source_path=str(p)))
else:
# just record stub entry for unsupported types
records.append(Record(id=str(p), title=p.stem, text=f"[Unsupported file type: {suf}]", source_path=str(p)))
except Exception:
records.append(Record(id=str(p), title=p.stem, text=f"[Error reading file: {p.name}]", source_path=str(p)))
# write JSONL
with out_jsonl.open("w", encoding="utf-8") as fh:
for r in records:
fh.write(json.dumps(r.__dict__, ensure_ascii=False) + "\n")
return records
def _strip_html(self, html: str) -> str:
# very naive fallback without bs4
try:
from bs4 import BeautifulSoup # type: ignore
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style"]):
tag.decompose()
text = soup.get_text("\n", strip=True)
return text
except Exception:
import re
txt = re.sub(r"<\s*(script|style)[^>]*>.*?<\s*/\s*\1\s*>", " ", html, flags=re.S|re.I)
txt = re.sub(r"<[^>]+>", " ", txt)
txt = re.sub(r"\s+", " ", txt)
return txt.strip()
def _pdf_text(self, path: Path) -> str:
try:
doc = self._fitz.open(str(path))
out = []
for i in range(len(doc)):
page = doc.load_page(i)
out.append(page.get_text("text"))
return "\n\n".join(out)
except Exception:
return ""
class ExternalCorpusBuilder:
"""Invokes corpus_builder.py as a subprocess to build a JSONL corpus."""
def __init__(self, script_path: Path) -> None:
self.script = script_path
def build(self, root: Path, out_jsonl: Path, *, workers: int = 4, verbose: bool = False) -> bool:
cmd = [
sys.executable,
str(self.script),
"--root", str(root),
"--out", str(out_jsonl),
"--emit", "auto",
"--workers", str(max(1, workers)),
"--llm-parallel", "1",
]
if verbose:
cmd.append("--verbose")
try:
import subprocess
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
ok = proc.returncode == 0 and out_jsonl.exists() and out_jsonl.stat().st_size > 0
return ok
except Exception:
return False
# -----------------------------
# Ollama client (stdlib HTTP)
# -----------------------------
class OllamaClient:
def __init__(self, host: str = "http://localhost:11434", timeout: int = 600):
self.host = host.rstrip("/")
self.timeout = timeout
def generate(self, model: str, prompt: str) -> str:
url = f"{self.host}/api/generate"
payload = {
"model": model,
"prompt": prompt,
"stream": False,
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}, method="POST")
try:
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
body = resp.read()
except urllib.error.HTTPError as e:
raise RuntimeError(f"Ollama HTTP error {e.code}: {e.read().decode('utf-8', 'ignore')}")
except Exception as e:
raise RuntimeError(f"Ollama request failed: {e}")
try:
obj = json.loads(body.decode("utf-8", "ignore"))
except Exception:
raise RuntimeError("Invalid JSON from Ollama")
return (obj.get("response") or "").strip()
def _parse_json_strict(s: str) -> Optional[Dict[str, str]]:
try:
s = sanitize_llm_text_simple(s)
return json.loads(s)
except Exception:
m = re.search(r"\{[\s\S]*\}", s)
if m:
try:
return json.loads(m.group(0))
except Exception:
return None
return None
# (Discord webhook removed; now using Git push flow)
# -----------------------------
# Prompting
# -----------------------------
'''
PROMPT_TEMPLATE = """
You are an experienced product strategist and technical writer.
Write a clear, compelling PROJECT CONCEPT based only on the information in Notes and Knowledge Base below.
Output in Markdown with these sections:
- Overview & Problem
- Vision & Goals
- Target Users & Use Cases
- Key Features
- Non-Goals / Out of Scope
- Constraints & Risks
- Rough Architecture / Approach
- Milestones & Next Steps
- Success Criteria & Metrics
Guidelines:
- Be pragmatic, specific, and actionable. Do not fabricate details not present in the sources.
- Pull concrete facts, quotes, and constraints from the knowledge base.
- If something is unknown or ambiguous, add a TODO with a short question.
- Use short paragraphs and bullets; avoid marketing fluff.
Assets to include:
- The following files will be committed alongside README.md in the same folder.
- When relevant, embed images using Markdown (e.g., `![Label](./FILENAME)`), and link documents/other files using `[Label](./FILENAME)`.
- Place links or images in the appropriate sections where they add clarity.
Assets Provided:
{ASSETS}
Notes (from user):
{NOTES}
Knowledge Base (source excerpts):
{KB}
""".strip()
'''
PROMPT_TEMPLATE = """
You are a cross-domain concept developer (product strategist, creative producer, research lead, grant writer).
Turn the sources into a concise, presentable CONCEPT document. Adapt to the domain.
INSTRUCTIONS
1) Detect IDEA TYPE (pick one primary; if unclear, choose closest and add a TODO):
{Product/Software, Service, Research/Study, Policy/Proposal, Art/Exhibition/Performance, Event/Program,
Education/Curriculum, Media/Film/Publication, Campaign/Nonprofit, Data/ML/Infrastructure, Game/Interactive,
Writing/Book/Article, Other}
2) Tone & register:
- Product/Software → pragmatic PM/tech brief
- Research → neutral academic project brief
- Policy → policy memo
- Art/Exhibition/Performance → curator/producer note (clear, not flowery)
- Event → producers run-of-show style
- Education → syllabus brief
- Media/Publication → one-sheet
- Campaign/Nonprofit → strategy brief
- Data/ML/Infrastructure → engineering design note
- Game/Interactive → design doc overview
- Writing/Book/Article → proposal overview
3) Output Markdown using these core sections (use these exact headings; include only relevant ones):
- Overview & Intent
- Context / Problem (or Opportunity)
- Audience / Stakeholders
- Deliverables / Outputs & Scope
- Approach / Method (rename to “Methodology”, “Implementation Plan”, “Format & Installation Plan”, etc., to fit the idea type)
- Resources / Budget / Tools (only if present; else add a short TODO)
- Timeline & Milestones
- Risks, Ethics & Constraints
- Success Criteria / Evaluation
- Open Questions (TODOs)
Add one domain-specific block (only if relevant and supported by sources):
- Product/Software: Key Features; Non-Goals; Rough Architecture; Dependencies & Integration; License.
- Research/Study: Research Questions; Methodology & Data; Expected Contributions; References/Citations.
- Policy/Proposal: Policy Mechanism; Legal/Standards; Impact Assessment; Implementation Steps.
- Art/Exhibition/Performance: Conceptual Frame & References; Medium/Materials; Venue/Spatial Requirements; Tech/AV; Rights/Permissions.
- Event/Program: Programme Outline / Run-of-Show; Roles & Staffing; Logistics & Venue.
- Education/Curriculum: Learning Objectives; Syllabus Outline; Assessment & Materials.
- Media/Film/Publication: Logline & Synopsis; Format; Production Plan; Distribution.
- Campaign/Nonprofit: Theory of Change; Channels & Tactics; KPIs; Partnerships.
- Data/ML/Infrastructure: Data Sources; Models; Architecture Diagram (describe); Privacy & Compliance; Ops/Monitoring.
- Game/Interactive: Core Loop; Mechanics; Narrative; Tech; Monetization (if relevant).
- Writing/Book/Article: Thesis; Outline/Chapters; Sources; Target Readers.
4) Evidence use:
- Use only facts in Notes/KB. If missing, add short TODOs instead of inventing.
- Where a claim relies on a specific source, include a short inline blockquote with “Source: <Path or Title>”.
5) Assets:
- These files are committed alongside README.md. Embed images with Markdown and link documents where they help clarity.
STYLE
- Short paragraphs and bullets; concrete, specific, and actionable. Avoid marketing fluff.
- If dates/budget/ownership are uncertain, show ranges or TODOs.
- Keep a neutral, professional tone adapted to the idea type.
TITLE
- Generate a neutral 2-4 words working title.
- Begin the document with “# {Title}”.
Assets Provided:
{ASSETS}
Notes (from user):
{NOTES}
Knowledge Base (source excerpts):
{KB}
""".strip()
REPHRASE_LENSES = [
{
"key": "neutral",
"label": "Neutral Clarification / Expansion",
"prompt": """Take the following rough note and turn it into a single clear, concise paragraph that captures the main idea.
- Keep a neutral, explanatory tone.
- Don't add new features or speculation, only clarify and connect what is already there.
- Output exactly one paragraph.
Note:
{USER_NOTE}
""",
},
{
"key": "problem_solution",
"label": "Problem-Solution Framing",
"prompt": """Rewrite the following note as a single paragraph that clearly describes:
1. What problem or frustration exists,
2. For whom,
3. How the idea could solve it in principle.
Keep it concrete but high-level, no implementation details.
Output exactly one paragraph.
Note:
{USER_NOTE}
""",
},
{
"key": "user_story",
"label": "User Story / Scenario",
"prompt": """Rewrite the following note as a single paragraph that describes a short scenario from a user's point of view.
Show how a specific person encounters the situation and how this idea helps them.
Keep it realistic and simple, not hype-y.
Output exactly one paragraph.
Note:
{USER_NOTE}
""",
},
{
"key": "value_prop",
"label": "Value Proposition / Pitch",
"prompt": """Rewrite the following note as a single paragraph that sounds like a clear, simple pitch of the idea.
Explain what it is, who it's for, and why it's valuable or interesting.
Avoid buzzwords; keep it grounded and concrete.
Output exactly one paragraph.
Note:
{USER_NOTE}
""",
},
{
"key": "implementation",
"label": "Implementation / Next Steps",
"prompt": """Rewrite the following note as a single paragraph that keeps the original idea but focuses on how one might start implementing or exploring it.
Mention 2-3 plausible first steps or components without going into deep technical detail.
Output exactly one paragraph.
Note:
{USER_NOTE}
""",
},
]
EXTEND_PROMPT = """
You are continuing the user's own note. Keep writing in the same language, tone, and formatting style they used.
Instructions:
- Extend the idea with additional possibilities, use cases, angles, or problems to consider.
- Preserve the author's voice: match their formality, punctuation habits, and quirks (e.g., all lowercase, terse bullets, or formal sentences).
- Do not summarize or rewrite the original; add new material that flows naturally after it.
- Keep it concise (2-5 sentences or a few short bullet points).
- If the input is in bullet form, continue the bullets; otherwise, continue the paragraph.
Original note:
{USER_NOTE}
""".strip()
class IdeaCategory(str, Enum):
APP_OR_TOOL = "APP_OR_TOOL"
DASHBOARD_OR_ANALYTICS = "DASHBOARD_OR_ANALYTICS"
DEV_TOOL_OR_API = "DEV_TOOL_OR_API"
PHYSICAL_PRODUCT = "PHYSICAL_PRODUCT"
SYSTEM_OR_WORKFLOW = "SYSTEM_OR_WORKFLOW"
ABSTRACT_FRAMEWORK = "ABSTRACT_FRAMEWORK"
SERVICE_OR_EVENT = "SERVICE_OR_EVENT"
SPATIAL_DESIGN_OR_INSTALLATION = "SPATIAL_DESIGN_OR_INSTALLATION"
GAME_OR_WORLD = "GAME_OR_WORLD"
BRAND_OR_CAMPAIGN = "BRAND_OR_CAMPAIGN"
EDUCATIONAL_TOOL = "EDUCATIONAL_TOOL"
DATA_INFRASTRUCTURE = "DATA_INFRASTRUCTURE"
VISUALIZATION_HINTS: Dict[IdeaCategory, str] = {
IdeaCategory.APP_OR_TOOL: "Hero UI screen on a device mockup, showing the main interface and color palette.",
IdeaCategory.DASHBOARD_OR_ANALYTICS: "Full-screen dashboard view with charts, cards, widgets and clear information hierarchy.",
IdeaCategory.DEV_TOOL_OR_API: "Stylized developer scene with screens and terminal, or a clean system architecture diagram.",
IdeaCategory.PHYSICAL_PRODUCT: "Hero product shot of the object, centered, photorealistic, materials and key features clearly visible.",
IdeaCategory.SYSTEM_OR_WORKFLOW: "Isometric system diagram showing entities and arrows, clean infographic look.",
IdeaCategory.ABSTRACT_FRAMEWORK: "Metaphorical, atmospheric scene representing the idea using one strong visual metaphor.",
IdeaCategory.SERVICE_OR_EVENT: "Lifestyle scene with people interacting in an environment, representing the experience.",
IdeaCategory.SPATIAL_DESIGN_OR_INSTALLATION: "Hero shot of the space or installation, wide view, with lighting and geometry clearly visible.",
IdeaCategory.GAME_OR_WORLD: "In-game style scene showing a players point of view or isometric world with the core mechanic visible.",
IdeaCategory.BRAND_OR_CAMPAIGN: "Bold key visual / poster with strong graphic composition and a central symbol or logo-like element.",
IdeaCategory.EDUCATIONAL_TOOL: "Scene with a learner interacting with an interface, or a clear diagram of the method.",
IdeaCategory.DATA_INFRASTRUCTURE: "Network-like visualization with nodes and connections, or a dense monitoring dashboard.",
}
def classify_idea(idea_text: str) -> Dict[str, Any]:
"""Heuristic classifier based on keyword matching; returns category and visualization hint."""
text = (idea_text or "").lower()
def has(*phrases: str) -> bool:
return any(p in text for p in phrases)
category: IdeaCategory
if has("dashboard", "analytics", "kpi", "monitoring panel", "business intelligence"):
category = IdeaCategory.DASHBOARD_OR_ANALYTICS
elif has("observability", "traces", "logs", "infrastructure", "nodes", "monitoring", "telemetry"):
category = IdeaCategory.DATA_INFRASTRUCTURE
elif has("api", "sdk", "cli", "developer", "framework", "backend"):
category = IdeaCategory.DEV_TOOL_OR_API
elif has("device", "hardware", "physical", "furniture", "wearable", "sensor"):
category = IdeaCategory.PHYSICAL_PRODUCT
elif has("workflow", "pipeline", "automation", "process", "system architecture", "orchestration"):
category = IdeaCategory.SYSTEM_OR_WORKFLOW
elif has("mobile app", "ios", "android", "web app", "saas", "desktop app", "tool", "platform", "software"):
category = IdeaCategory.APP_OR_TOOL
elif has("framework", "philosophy", "mindset", "mental model", "metaphor"):
category = IdeaCategory.ABSTRACT_FRAMEWORK
elif has("event", "workshop", "conference", "service", "consulting", "community"):
category = IdeaCategory.SERVICE_OR_EVENT
elif has("space", "room", "gallery", "installation", "architecture", "store layout"):
category = IdeaCategory.SPATIAL_DESIGN_OR_INSTALLATION
elif has("game", "player", "level", "world", "gamified"):
category = IdeaCategory.GAME_OR_WORLD
elif has("brand", "campaign", "logo", "poster", "identity"):
category = IdeaCategory.BRAND_OR_CAMPAIGN
elif has("learning", "course", "students", "tutorial", "study", "education", "teaching"):
category = IdeaCategory.EDUCATIONAL_TOOL
else:
product_like = has("app", "tool", "product", "platform", "saas", "software")
category = IdeaCategory.APP_OR_TOOL if product_like else IdeaCategory.ABSTRACT_FRAMEWORK
return {
"category": category,
"visualization_hint": VISUALIZATION_HINTS.get(category, ""),
}
def build_image_prompt_system_message(category: IdeaCategory, visualization_hint: str) -> str:
return f"""
You are an expert concept artist and image prompt writer for text-to-image models.
The user will give you a description of an idea: a product, project, app, physical object, system, or abstract concept.
Your job:
- Understand the idea and decide the single most effective visualization for it.
- Then write ONE powerful, detailed image prompt.
The idea has been classified as:
- Category: {category.value}
- Recommended visualization style: {visualization_hint}
You must:
1. Internally figure out:
- What matters most to show for this category (function, form, context of use, mood, or metaphor).
- How to best apply the recommended visualization style: {visualization_hint}.
2. Choose ONE visualization approach that feels most natural and expressive for this idea.
You can pick any camera angle, composition, style, and mood you like, as long as it serves the idea and stays consistent with the recommended visualization style.
3. In the final answer, output ONLY a single image description, as one paragraph, around 4080 words, ready to send to an image generation model.
4. In that paragraph, clearly specify:
- Main subject and what is happening
- Environment / background context
- Camera angle and shot type (e.g. "isometric view", "over-the-shoulder", "close-up")
- Art style / medium (e.g. "clean flat vector illustration", "photorealistic 3D render", "anime style", "technical blueprint")
- Lighting and color mood
- Level of detail (e.g. "highly detailed", "minimalist")
- Optional negative constraints if useful (e.g. "no text, no logos")
5. Do NOT mention the words “user”, “idea”, “prompt”, “concept art”, or “text-to-image model”.
Just describe the image directly.
ASSISTANT:
(One single paragraph image description)
""".strip()
def generate_image_prompt_for_idea(idea_text: str, *, client: OllamaClient, model: str) -> str:
cleaned = (idea_text or "").strip()
if not cleaned:
raise ValueError("Idea text is empty")
details = classify_idea(cleaned)
category: IdeaCategory = details.get("category", IdeaCategory.APP_OR_TOOL)
visualization_hint = details.get("visualization_hint", VISUALIZATION_HINTS.get(category, ""))
system_message = build_image_prompt_system_message(category, visualization_hint)
prompt = f"{system_message}\n\nUSER IDEA:\n{cleaned}\n\nASSISTANT:"
raw = client.generate(model=model, prompt=prompt)
return sanitize_llm_text_simple(raw)
def build_kb_string(records: List[Record], *, max_chars: int = 80000, per_record_cap: int = 4000) -> str:
"""Format records into a compact KB string with caps to avoid overlong prompts."""
parts: List[str] = []
budget = max_chars
for r in records:
if budget <= 0:
break
text = (r.text or "").strip()
if not text:
continue
if len(text) > per_record_cap:
text = text[:per_record_cap] + "\n...[truncated]"
title = r.title or (Path(r.source_path).name if r.source_path else r.id)
header = f"\n---\nSource: {title}\nPath: {r.source_path or ''}\n\n"
chunk = header + text.strip() + "\n"
if len(chunk) > budget:
chunk = chunk[:budget]
parts.append(chunk)
budget -= len(chunk)
return ("\n".join(parts)).strip()
def sanitize_llm_text_simple(s: str) -> str:
"""Remove <think> blocks and surrounding code fences from LLM responses."""
try:
s = re.sub(r"<think>.*?</think>", "", s, flags=re.S|re.I)
s = re.sub(r"^\s*```(?:\w+)?\s*", "", s)
s = re.sub(r"\s*```\s*$", "", s)
return s.strip()
except Exception:
return (s or "").strip()
def md_heading_replace_or_insert(md: str, title: str) -> str:
"""Ensure the first-level heading is the provided title, replacing '# PROJECT CONCEPT' if present."""
if not md:
return f"# {title}\n\n"
lines = md.splitlines()
# Replace '# PROJECT CONCEPT' (case-insensitive)
if lines and re.match(r"^\s*#\s+project\s+concept\s*$", lines[0], flags=re.I):
lines[0] = f"# {title}"
return "\n".join(lines)
# If first line is already a H1, leave it as-is; else insert
if lines and re.match(r"^\s*#\s+", lines[0]):
return md
return f"# {title}\n\n" + md
def strip_wrapping_quotes(s: str) -> str:
s = s.strip()
s = re.sub(r"^[\"'“”‘’]+", "", s)
s = re.sub(r"[\"'“”‘’]+$", "", s)
return s
# -----------------------------
# GUI Application
# -----------------------------
class App(TkinterDnD.Tk): # type: ignore
def __init__(self):
super().__init__()
self.title("Idea → Concept")
self.geometry("1080x720")
# Enforce a sensible minimum window size (as before)
try:
self.minsize(900, 600)
except Exception:
pass
# Try to set application/window icon from icon.png
try:
self._set_app_icon()
except Exception:
# Non-fatal: keep running with default icon
pass
# State
self.files: List[Path] = []
self.websites: List[str] = []
self.records: List[Record] = []
# Legacy fields kept but no longer used for per-session staging
self.staging_dir: Optional[Path] = None
self.corpus_path: Optional[Path] = None
self.include_map: Dict[str, bool] = {}
self.file_hashes: Dict[str, str] = {} # path -> sha256
self._seen_hashes: Set[str] = set() # hashes present in corpus.jsonl
self._ingesting: Set[str] = set() # hashes currently being ingested
self._base_dir: Path = Path.cwd() / ".idea-hole"
self._files_dir: Path = self._base_dir / "files"
self._corpus_file: Path = self._base_dir / "corpus.jsonl"
self._sessions_file: Path = self._base_dir / "sessions.jsonl"
self.rephrase_variants: List[Dict[str, str]] = []
self.rephrase_selected_key: Optional[str] = None
self._suppress_rephrase_select: bool = False
self._rephrase_visible: bool = False
# Defaults
self.ollama_host = tk.StringVar(value=os.environ.get("OLLAMA_HOST", "http://localhost:11434"))
# Default model prompt suggests explicit selection
self.ollama_model = tk.StringVar(value=os.environ.get("IDEA_HOLE_MODEL", "Select model..."))
self.git_remote_url = tk.StringVar(value=os.environ.get("IDEA_HOLE_REMOTE", ""))
# SearXNG URL (for prior-art search)
try:
_searx_default = websearch.SEARX_DEFAULT_URL
except Exception:
_searx_default = "http://localhost:8888"
self.searx_url = tk.StringVar(value=os.environ.get("SEARX_URL", _searx_default))
# Concept metadata
self.title_var = tk.StringVar(value="")
self.desc_var = tk.StringVar(value="")
# Load persisted settings (if any)
self._load_settings()
# Prepare unified storage and indexes
self._init_storage()
self._build_ui()
self._refresh_rephrase_tree()
self._update_rephrase_visibility()
self._maybe_enable_dnd()
# Dirty tracking and close handler
self._dirty = False
try:
self.protocol('WM_DELETE_WINDOW', self.on_close)
except Exception:
pass
# Initialize last-saved snapshot to current clean state
try:
self._last_saved = self._snapshot_session_state()
except Exception:
self._last_saved = {
"title": "",
"description": "",
"notes": "",
"concept": "",
"files": [],
"rephrase_variants": [],
"rephrase_selected_key": None,
}
def _set_app_icon(self) -> None:
"""Set the window/taskbar/dock icon from local icon.png when possible.
- Uses Tk's iconphoto on all platforms (PNG supported on Tk 8.6+).
- On macOS, also tries to set the dock icon via AppKit if available.
"""
icon_path = Path(__file__).parent / "icon.png"
if not icon_path.exists():
return
# Keep a reference to avoid GC
try:
self._icon_photoimage = tk.PhotoImage(file=str(icon_path))
# Set for this window and as the default for future toplevels
try:
self.iconphoto(True, self._icon_photoimage)
except Exception:
# Older Tk variants
self.tk.call('wm', 'iconphoto', self._w, self._icon_photoimage)
except Exception:
# PhotoImage may fail if Tk lacks PNG support
self._icon_photoimage = None
# On macOS, set the dock icon using AppKit if available (optional)
if sys.platform == 'darwin':
try:
from AppKit import NSImage, NSApplication
app = NSApplication.sharedApplication()
nsimg = NSImage.alloc().initWithContentsOfFile_(str(icon_path))
if nsimg is not None:
app.setApplicationIconImage_(nsimg)
except Exception:
# AppKit (pyobjc) not available; ignore
pass
# --- UI construction
def _build_ui(self):
root = self
# Top controls
top = ttk.Frame(root)
top.pack(side=tk.TOP, fill=tk.X, padx=8, pady=6)
# Session file actions at top-left
ttk.Button(top, text="New", command=self.on_new_session).pack(side=tk.LEFT)
ttk.Button(top, text="Open", command=self.on_open_session).pack(side=tk.LEFT, padx=(6,0))
ttk.Button(top, text="Save", command=self.on_save_session).pack(side=tk.LEFT, padx=(6,0))
# Status moved to top-right
self.status = ttk.Label(top, text="Ready", anchor=tk.E)
self.status.pack(side=tk.RIGHT)
# Paned layout: left (files + notes), right (concept)
paned = ttk.Panedwindow(root, orient=tk.HORIZONTAL)
paned.pack(fill=tk.BOTH, expand=True, padx=8, pady=(0,8))
left = ttk.Frame(paned)
right = ttk.Frame(paned)
paned.add(left, weight=1)
paned.add(right, weight=1)
# Files table (with controls inside the same frame)
files_frame = ttk.LabelFrame(left, text="Files & Websites (drag & drop files; add URLs)")
files_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=False, pady=(0,8))
# Inner container for tree + scrollbar to allow controls below
files_inner = ttk.Frame(files_frame)
files_inner.pack(side=tk.TOP, fill=tk.BOTH, expand=True)
# Prevent scrollbar squashing by using grid with a fixed minsize column
try:
files_inner.rowconfigure(0, weight=1)
files_inner.columnconfigure(0, weight=1)
files_inner.columnconfigure(1, minsize=14)
except Exception:
pass
cols = ("name", "path", "type", "size", "include")
self.tree = ttk.Treeview(files_inner, columns=cols, show="headings", height=8)
for c, w in ("name", 80), ("path", 100), ("type", 20), ("size", 30), ("include", 30):
heading = "Add to Repo" if c == "include" else c.capitalize()
self.tree.heading(c, text=heading)
self.tree.column(c, width=w, anchor=tk.W)
try:
self.tree.grid(row=0, column=0, sticky='nsew')
except Exception:
self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.tree_vsb = ttk.Scrollbar(files_inner, orient=tk.VERTICAL, command=self.tree.yview)
try:
self.tree_vsb.grid(row=0, column=1, sticky='ns')
except Exception:
# Fallback if grid not available
self.tree_vsb.pack(side=tk.RIGHT, fill=tk.Y)
# Connect scrollbar normally (always visible; resilient to squashing)
self.tree.configure(yscrollcommand=self.tree_vsb.set)
# Click to toggle include checkbox-like column
self.tree.bind('<Button-1>', self.on_tree_click)
# No auto-hide for reliability
# File table controls inside the files frame border
file_controls = ttk.Frame(files_frame)
# Slightly more padding at bottom than top
file_controls.pack(side=tk.TOP, fill=tk.X, padx=(8,0), pady=(4,8))
ttk.Button(file_controls, text="Add Files", command=self.on_add_files).pack(side=tk.LEFT)
ttk.Button(file_controls, text="Add Folder", command=self.on_add_folder).pack(side=tk.LEFT, padx=(6,0))
ttk.Button(file_controls, text="Add Website", command=self.on_add_website).pack(side=tk.LEFT, padx=(6,0))
ttk.Button(file_controls, text="Remove Selected", command=self.on_remove_selected).pack(side=tk.LEFT, padx=(6,0))
ttk.Button(file_controls, text="Clear All", command=self.on_clear_all).pack(side=tk.LEFT, padx=(6,0))
# Notes (with Generate Concept button inside the same frame)
notes_frame = ttk.LabelFrame(left, text="Notes / Thoughts")
notes_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True, pady=(8,0))
# Notes text area with ttk scrollbar that auto-hides
notes_container = ttk.Frame(notes_frame)
notes_container.pack(fill=tk.BOTH, expand=True)
try:
notes_container.rowconfigure(0, weight=1)
notes_container.columnconfigure(0, weight=1)
notes_container.columnconfigure(1, minsize=14)
except Exception:
pass
self.notes = tk.Text(notes_container, height=12, wrap=tk.WORD, undo=True)
try:
self.notes.grid(row=0, column=0, sticky='nsew')
except Exception:
self.notes.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.notes_vsb = ttk.Scrollbar(notes_container, orient=tk.VERTICAL, command=self.notes.yview)
try:
self.notes_vsb.grid(row=0, column=1, sticky='ns')
except Exception:
# Fallback
self.notes_vsb.pack(side=tk.RIGHT, fill=tk.Y)
# Connect scrollbar normally (always visible; resilient to squashing)
self.notes.configure(yscrollcommand=self.notes_vsb.set)
# No auto-hide for reliability
# Rephrase/variant list sits between the notes box and action buttons
self.rephrase_frame = ttk.Frame(notes_frame)
self._rephrase_pack_kwargs = {"side": tk.TOP, "fill": tk.BOTH, "expand": False, "padx": (8,0), "pady": (6,0)}
ttk.Label(self.rephrase_frame, text="Rephrase variants:").pack(anchor=tk.W)
rephrase_list_container = ttk.Frame(self.rephrase_frame)
rephrase_list_container.pack(fill=tk.BOTH, expand=True)
try:
rephrase_list_container.rowconfigure(0, weight=1)
rephrase_list_container.columnconfigure(0, weight=1)
rephrase_list_container.columnconfigure(1, minsize=14)
except Exception:
pass
self.rephrase_tree = ttk.Treeview(rephrase_list_container, columns=("variant", "preview"), show="headings", height=6, selectmode="browse")
self.rephrase_tree.heading("variant", text="Variant")
self.rephrase_tree.heading("preview", text="Preview")
self.rephrase_tree.column("variant", width=140, anchor=tk.W)
self.rephrase_tree.column("preview", width=360, anchor=tk.W)
try:
self.rephrase_tree.grid(row=0, column=0, sticky="nsew")
except Exception:
self.rephrase_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.rephrase_vsb = ttk.Scrollbar(rephrase_list_container, orient=tk.VERTICAL, command=self.rephrase_tree.yview)
try:
self.rephrase_vsb.grid(row=0, column=1, sticky="ns")
except Exception:
self.rephrase_vsb.pack(side=tk.RIGHT, fill=tk.Y)
self.rephrase_tree.configure(yscrollcommand=self.rephrase_vsb.set)
self.rephrase_tree.bind("<<TreeviewSelect>>", self._on_rephrase_select)
notes_actions = ttk.Frame(notes_frame)
notes_actions.pack(side=tk.TOP, fill=tk.X, padx=(8,0), pady=(6,6))
self.notes_actions = notes_actions
ttk.Button(notes_actions, text="Rephrase", command=self.on_rephrase).pack(side=tk.LEFT)
ttk.Button(notes_actions, text="Extend", command=self.on_extend).pack(side=tk.LEFT, padx=(6,0))
ttk.Button(notes_actions, text="Generate Concept", command=self.on_generate).pack(side=tk.LEFT, padx=(6,0))
ttk.Button(notes_actions, text="Find Prior Art", command=self.on_prior_art).pack(side=tk.LEFT, padx=(6,0))
# Concept editor + metadata
concept_frame = ttk.LabelFrame(right, text="Concept (editable)")
concept_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True)
meta = ttk.Frame(concept_frame)
# Stack Title and Description vertically; align with section inset
meta.pack(side=tk.TOP, fill=tk.X, padx=(8,8), pady=(6,6))
ttk.Label(meta, text="Title:").pack(anchor=tk.W)
self.title_entry = tk.Entry(meta, textvariable=self.title_var)
self.title_entry.pack(fill=tk.X, expand=True)
ttk.Label(meta, text="Description:").pack(anchor=tk.W, pady=(6,0))
self.desc_entry = tk.Entry(meta, textvariable=self.desc_var)
self.desc_entry.pack(fill=tk.X, expand=True)
# Concept text area with ttk scrollbar that auto-hides
concept_container = ttk.Frame(concept_frame)
concept_container.pack(fill=tk.BOTH, expand=True)
try:
concept_container.rowconfigure(0, weight=1)
concept_container.columnconfigure(0, weight=1)
concept_container.columnconfigure(1, minsize=14)
except Exception:
pass
self.concept = tk.Text(concept_container, wrap=tk.WORD, undo=True)
try:
self.concept.grid(row=0, column=0, sticky='nsew')
except Exception:
self.concept.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.concept_vsb = ttk.Scrollbar(concept_container, orient=tk.VERTICAL, command=self.concept.yview)
try:
self.concept_vsb.grid(row=0, column=1, sticky='ns')
except Exception:
# Fallback
self.concept_vsb.pack(side=tk.RIGHT, fill=tk.Y)
# Connect scrollbar normally (always visible; resilient to squashing)
self.concept.configure(yscrollcommand=self.concept_vsb.set)
# No auto-hide for reliability
#
# Match Title/Description entry backgrounds to textarea background
try:
_bg = self.concept.cget('background')
self.title_entry.configure(bg=_bg)
self.desc_entry.configure(bg=_bg)
except Exception:
pass
# Push to Repo under the concept editor (aligned horizontally with Generate Concept)
concept_actions = ttk.Frame(concept_frame)
concept_actions.pack(side=tk.TOP, fill=tk.X, padx=(8,0), pady=(6,6))
# PDF preview button
self.preview_btn = ttk.Button(concept_actions, text="Preview PDF", command=self.on_preview)
self.preview_btn.pack(side=tk.LEFT)
self.push_btn = ttk.Button(concept_actions, text="Push to Repo", command=self.on_push)
self.push_btn.pack(side=tk.LEFT, padx=(6,0))
self.image_prompt_btn = ttk.Button(concept_actions, text="Generate image prompt", command=self.on_generate_image_prompt)
self.image_prompt_btn.pack(side=tk.LEFT, padx=(6,0))
image_prompt_frame = ttk.LabelFrame(concept_frame, text="Image prompt")
image_prompt_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=False, padx=(8,8), pady=(0,8))
image_prompt_container = ttk.Frame(image_prompt_frame)
image_prompt_container.pack(fill=tk.BOTH, expand=True)
try:
image_prompt_container.rowconfigure(0, weight=1)
image_prompt_container.columnconfigure(0, weight=1)
image_prompt_container.columnconfigure(1, minsize=14)
except Exception:
pass
self.image_prompt_text = tk.Text(image_prompt_container, height=6, wrap=tk.WORD, state=tk.DISABLED)
try:
self.image_prompt_text.grid(row=0, column=0, sticky='nsew')
except Exception:
self.image_prompt_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
self.image_prompt_vsb = ttk.Scrollbar(image_prompt_container, orient=tk.VERTICAL, command=self.image_prompt_text.yview)
try:
self.image_prompt_vsb.grid(row=0, column=1, sticky='ns')
except Exception:
self.image_prompt_vsb.pack(side=tk.RIGHT, fill=tk.Y)
self.image_prompt_text.configure(yscrollcommand=self.image_prompt_vsb.set)
self._set_image_prompt_text("Generated image prompt will appear here.")
# Bottom bar
bottom = ttk.Frame(root)
bottom.pack(side=tk.BOTTOM, fill=tk.X, padx=8, pady=(0,8))
# Moved Ollama/Model/Remote inputs to bottom bar
ttk.Label(bottom, text="Ollama host:").pack(side=tk.LEFT)
ttk.Entry(bottom, textvariable=self.ollama_host, width=22).pack(side=tk.LEFT, padx=(4,10))
ttk.Label(bottom, text="Model:").pack(side=tk.LEFT)
self.model_combo = ttk.Combobox(bottom, textvariable=self.ollama_model, state="readonly", width=20, values=self._get_model_values())
self.model_combo.pack(side=tk.LEFT, padx=(4,10))
ttk.Label(bottom, text="Remote git repository:").pack(side=tk.LEFT)
ttk.Entry(bottom, textvariable=self.git_remote_url, width=40).pack(side=tk.LEFT, padx=(4,10))
ttk.Label(bottom, text="SearXNG URL:").pack(side=tk.LEFT)
ttk.Entry(bottom, textvariable=self.searx_url, width=26).pack(side=tk.LEFT, padx=(4,0))
# Enable/disable push button based on fields
self.title_var.trace_add('write', lambda *a: (self.update_push_state(), self._set_dirty(True)))
self.desc_var.trace_add('write', lambda *a: (self.update_push_state(), self._set_dirty(True)))
self.concept.bind('<<Modified>>', self._on_concept_modified)
try:
self.notes.bind('<<Modified>>', self._on_notes_modified)
except Exception:
pass
# Persist settings
self.ollama_host.trace_add('write', lambda *a: self._save_settings())
self.ollama_model.trace_add('write', lambda *a: self._save_settings())
self.git_remote_url.trace_add('write', lambda *a: self._save_settings())
self.searx_url.trace_add('write', lambda *a: self._save_settings())
# Optional tool overrides would be persisted if we expose them later
self.update_push_state()
# Ensure placeholder if saved model not present
try:
values = list(self.model_combo["values"]) if hasattr(self, 'model_combo') else []
if self.ollama_model.get() not in values:
self.ollama_model.set("Select model...")
except Exception:
pass
def _extract_title_desc(self, concept_md: str, client: Optional[OllamaClient] = None) -> (Optional[str], Optional[str]):
"""Second-pass LLM call to extract title and description as JSON."""
try:
client = client or OllamaClient(host=self.ollama_host.get())
prompt = (
"Extract a concise title and a one-sentence description from the following concept.\n"
"- Title: <= 50 chars (3-5 words).\n- Description: <= 120 chars, (one sentence) no trailing period.\n"
"Return ONLY strict JSON with keys 'title' and 'description'.\n\nCONCEPT:\n" + concept_md
)
raw = client.generate(self.ollama_model.get(), prompt)
obj = _parse_json_strict(raw) or {}
title = strip_wrapping_quotes(str(obj.get('title') or '').strip()) or None
desc = strip_wrapping_quotes(str(obj.get('description') or '').strip()) or None
return title, desc
except Exception:
return None, None
def _maybe_enable_dnd(self):
if _TKDND_AVAILABLE and DND_FILES is not None:
try:
self.tree.drop_target_register(DND_FILES)
self.tree.dnd_bind('<<Drop>>', self._on_drop)
self._set_status("Drag & drop enabled")
except Exception:
self._set_status("Drag & drop not available (fallback to buttons)")
else:
self._set_status("Drag & drop not available (fallback to buttons)")
# --- File ops
def _on_drop(self, event): # type: ignore
raw = event.data
paths = self._parse_dnd_paths(raw)
self._add_paths(paths)
@staticmethod
def _parse_dnd_paths(s: str) -> List[Path]:
# Handles Tcl-style space-separated paths with braces
out: List[Path] = []
buf = ''
in_brace = False
for ch in s:
if ch == '{':
in_brace = True
if buf:
buf += ch
continue
if ch == '}':
in_brace = False
if buf:
buf += ch
continue
if ch == ' ' and not in_brace:
p = buf.strip().strip('{}')
if p:
out.append(Path(p))
buf = ''
else:
buf += ch
if buf.strip():
out.append(Path(buf.strip().strip('{}')))
# Expand directories
final: List[Path] = []
for p in out:
if p.is_dir():
for q in p.rglob('*'):
if q.is_file():
final.append(q)
elif p.exists():
final.append(p)
return final
def _add_paths(self, paths: List[Path]):
# Expand directories into files
expanded: List[Path] = []
for p in paths:
if p.is_dir():
for q in p.rglob('*'):
if q.is_file():
expanded.append(q)
elif p.is_file():
expanded.append(p)
added = 0
new_files: List[Path] = []
for p in expanded:
if p.is_file() and p not in self.files:
self.files.append(p)
self.include_map[str(p)] = True
try:
size = human_size(p.stat().st_size)
except Exception:
size = "?"
self.tree.insert('', tk.END, values=(p.name, str(p), p.suffix.lower(), size, ''))
added += 1
new_files.append(p)
if added:
self._set_status(f"Added {added} item(s)")
self._set_dirty(True)
# Kick off background ingest for any new files not in corpus
threading.Thread(target=self._ensure_corpus_for_files, args=(new_files,), kwargs={"blocking": True}, daemon=True).start()
#
def on_add_files(self):
paths = filedialog.askopenfilenames(title="Select files")
if not paths:
return
self._add_paths([Path(p) for p in paths])
def on_add_folder(self):
path = filedialog.askdirectory(title="Select folder")
if not path:
return
self._add_paths([Path(path)])
def on_remove_selected(self):
sels = self.tree.selection()
if not sels:
return
removed = 0
for item in sels:
vals = self.tree.item(item, 'values')
if vals:
p = Path(vals[1])
if p in self.files:
self.files.remove(p)
try:
self.include_map.pop(str(p), None)
except Exception:
pass
self.tree.delete(item)
removed += 1
if removed:
self._set_status(f"Removed {removed} item(s)")
self._set_dirty(True)
#
def on_clear_all(self):
self.tree.delete(*self.tree.get_children())
self.files.clear()
self.include_map.clear()
self._set_status("Cleared")
self._set_dirty(True)
#
def on_tree_click(self, event):
# Toggle include column when clicked
region = self.tree.identify_region(event.x, event.y)
if region != 'cell':
return
column = self.tree.identify_column(event.x) # e.g., '#1' .. '#5'
if column != '#5': # include column index
return
item = self.tree.identify_row(event.y)
if not item:
return
vals = list(self.tree.item(item, 'values'))
if not vals or len(vals) < 5:
return
path = str(vals[1])
current = self.include_map.get(path, True)
new = not current
self.include_map[path] = new
vals[4] = '' if new else ''
self.tree.item(item, values=tuple(vals))
self._set_dirty(True)
return 'break'
# --- Unified storage helpers
def _init_storage(self):
try:
self._base_dir.mkdir(parents=True, exist_ok=True)
self._files_dir.mkdir(parents=True, exist_ok=True)
if not self._corpus_file.exists():
self._corpus_file.write_text("", encoding="utf-8")
if not self._sessions_file.exists():
self._sessions_file.write_text("", encoding="utf-8")
# Build in-memory index of seen file hashes
self._seen_hashes = set()
with self._corpus_file.open("r", encoding="utf-8") as fh:
for line in fh:
if not line or not line.strip():
continue
try:
obj = json.loads(line)
except Exception:
continue
h = obj.get("file_hash")
if h:
self._seen_hashes.add(str(h))
except Exception:
# best effort; index stays possibly empty
self._seen_hashes = set()
def _compute_file_hash(self, path: Path) -> str:
h = hashlib.sha256()
try:
with path.open("rb") as fh:
while True:
b = fh.read(1024 * 1024)
if not b:
break
h.update(b)
except Exception:
# Fall back to name+mtime if unreadable
st = None
try:
st = path.stat()
except Exception:
pass
h.update((str(path) + "|" + str(getattr(st, 'st_mtime', 0.0))).encode("utf-8", "ignore"))
return h.hexdigest()
def _ensure_file_symlink(self, src: Path, file_hash: str) -> Path:
# name pattern: {hash}__basename
dst = self._files_dir / f"{file_hash}__{src.name}"
try:
if not dst.exists():
copy_or_link(src, self._files_dir)
except Exception:
pass
return dst
def _ingest_single_file(self, src: Path, file_hash: str, *, verbose: bool = False) -> bool:
# Build corpus entries for a single file into the unified corpus
try:
tmp_dir = Path.cwd() / ".idea-hole" / "ingest_tmp" / file_hash
try:
if tmp_dir.exists():
shutil.rmtree(tmp_dir)
except Exception:
pass
tmp_dir.mkdir(parents=True, exist_ok=True)
# Put a copy or symlink inside tmp_dir
local_file = copy_or_link(src, tmp_dir)
# Try external builder first
external = None
script = Path(__file__).parent / "corpus_builder.py"
if script.exists():
external = ExternalCorpusBuilder(script)
tmp_out = tmp_dir / "out.jsonl"
ok = False
if external is not None:
self._set_status(f"Indexing {src.name} (external)…")
ok = external.build(tmp_dir, tmp_out, workers=2, verbose=verbose)
if not ok:
self._set_status(f"Indexing {src.name} (simple)…")
try:
simple = SimpleCorpusBuilder()
recs = simple.build(tmp_dir, tmp_out)
ok = bool(recs)
except Exception:
ok = False
# Append to unified corpus with added metadata
if ok and tmp_out.exists():
ts = int(time.time())
with tmp_out.open("r", encoding="utf-8") as fh_in, self._corpus_file.open("a", encoding="utf-8") as fh_out:
for line in fh_in:
if not line.strip():
continue
try:
obj = json.loads(line)
except Exception:
continue
obj["file_hash"] = file_hash
# Ensure source_path is the real source
obj["source_path"] = str(src.resolve())
obj.setdefault("mime", obj.get("mime") or None)
obj["added_at"] = ts
try:
fh_out.write(json.dumps(obj, ensure_ascii=False) + "\n")
except Exception:
# As last resort, write ascii-only
fh_out.write(json.dumps(obj) + "\n")
# Update in-memory index
self._seen_hashes.add(file_hash)
return True
return False
finally:
# Cleanup tmp_dir
try:
shutil.rmtree(Path.cwd() / ".idea-hole" / "ingest_tmp" / file_hash)
except Exception:
pass
def _ensure_corpus_for_files(self, paths: List[Path], *, blocking: bool = True):
if not paths:
return
# Compute hashes and create symlinks; ingest missing
to_ingest: List[tuple[Path, str]] = []
for p in paths:
try:
h = self._compute_file_hash(p)
except Exception:
continue
self.file_hashes[str(p)] = h
self._ensure_file_symlink(p, h)
if h not in self._seen_hashes and h not in self._ingesting:
to_ingest.append((p, h))
if not to_ingest:
return
def _run():
try:
for src, h in to_ingest:
self._ingesting.add(h)
try:
self._ingest_single_file(src, h, verbose=False)
finally:
try:
self._ingesting.remove(h)
except Exception:
pass
self._set_status("Corpus up to date")
except Exception:
self._set_status("Corpus ingest failed (see logs)")
if blocking:
_run()
else:
threading.Thread(target=_run, daemon=True).start()
def _load_records_for_hashes(self, hashes: Set[str]) -> List[Record]:
out: List[Record] = []
if not hashes:
return out
try:
with self._corpus_file.open("r", encoding="utf-8") as fh:
for line in fh:
if not line.strip():
continue
try:
obj = json.loads(line)
except Exception:
continue
if str(obj.get("file_hash") or "") not in hashes:
continue
out.append(Record(
id=str(obj.get("id", "")),
title=str(obj.get("title", "")),
text=str(obj.get("text", "")),
source_path=str(obj.get("source_path", "")) if obj.get("source_path") else None,
mime=str(obj.get("mime", "")) if obj.get("mime") else None,
))
except Exception:
pass
return out
def ensure_and_load_kb_for_current(self) -> List[Record]:
# Ensure corpus contains all current files, then load those records
self._set_status("Checking corpus…")
self._ensure_corpus_for_files(self.files, blocking=True)
hashes = {self.file_hashes.get(str(p)) for p in self.files}
hashes = {h for h in hashes if h}
recs = self._load_records_for_hashes(hashes)
self.records = recs
self.corpus_path = self._corpus_file
self._set_status(f"KB ready with {len(recs)} records")
return recs
# --- Notes rephrase / extend
def on_rephrase(self):
model = (self.ollama_model.get() or "").strip()
if not model or model == "Select model...":
self._ui(lambda: messagebox.showinfo("Select model", "Please select a model first."))
return
note = self.notes.get("1.0", tk.END).strip()
if not note:
self._ui(lambda: messagebox.showinfo("No notes", "Add some notes to rephrase."))
return
self._set_status("Rephrasing…")
threading.Thread(target=self._rephrase_thread, args=(note, model), daemon=True).start()
def _rephrase_thread(self, note: str, model: str):
try:
client = OllamaClient(host=self.ollama_host.get())
variants: List[Dict[str, str]] = [{
"key": "original",
"label": "Original Note",
"text": note,
}]
total = len(REPHRASE_LENSES)
for idx, lens in enumerate(REPHRASE_LENSES, start=1):
self._set_status(f"Rephrasing ({idx}/{total})…")
prompt = (lens.get("prompt") or "").replace("{USER_NOTE}", note)
try:
raw = client.generate(model=model, prompt=prompt)
except Exception as e:
raise RuntimeError(f"{lens.get('label','Variant')} failed: {e}")
text = sanitize_llm_text_simple(raw)
variants.append({
"key": lens.get("key") or f"lens_{idx}",
"label": lens.get("label") or f"Variant {idx}",
"text": text,
})
self._set_status("Rephrase complete")
self._ui(lambda v=variants: self._apply_rephrase_results(v, select_key="original"))
except Exception as e:
self._set_status("Rephrase failed")
msg = f"Rephrase failed:\n{e}"
self._ui(lambda m=msg: messagebox.showerror("Error", m))
def _apply_rephrase_results(self, variants: List[Dict[str, str]], *, select_key: Optional[str] = None, mark_dirty: bool = True):
self.rephrase_variants = variants
self.rephrase_selected_key = select_key or (variants[0].get("key") if variants else None)
self._refresh_rephrase_tree()
self._update_rephrase_visibility()
if mark_dirty:
self._set_dirty(True)
def _refresh_rephrase_tree(self):
if not hasattr(self, "rephrase_tree"):
return
try:
self.rephrase_tree.delete(*self.rephrase_tree.get_children())
except Exception:
return
if not self.rephrase_variants:
return
for v in self.rephrase_variants:
key = v.get("key") or ""
label = v.get("label") or key
preview = self._preview_rephrase_text(v.get("text", ""))
iid = key or str(id(v))
try:
self.rephrase_tree.insert('', tk.END, iid=iid, values=(label, preview))
except Exception:
# if iid already exists, fall back to autogenerated
try:
self.rephrase_tree.insert('', tk.END, values=(label, preview))
except Exception:
pass
key_to_select = self.rephrase_selected_key or (self.rephrase_variants[0].get("key") if self.rephrase_variants else None)
if key_to_select:
self._suppress_rephrase_select = True
try:
self.rephrase_tree.selection_set(key_to_select)
self.rephrase_tree.see(key_to_select)
except Exception:
pass
finally:
self._suppress_rephrase_select = False
def _update_rephrase_visibility(self):
try:
has = bool(self.rephrase_variants)
if has and not self._rephrase_visible:
kwargs = getattr(self, "_rephrase_pack_kwargs", None) or {}
before = getattr(self, "notes_actions", None)
if before is not None:
self.rephrase_frame.pack(before=before, **kwargs)
else:
self.rephrase_frame.pack(**kwargs)
self._rephrase_visible = True
elif not has and self._rephrase_visible:
self.rephrase_frame.pack_forget()
self._rephrase_visible = False
except Exception:
# If anything fails, leave visibility unchanged
pass
@staticmethod
def _preview_rephrase_text(text: str, limit: int = 160) -> str:
clean = re.sub(r"\s+", " ", text or "").strip()
if len(clean) <= limit:
return clean
return clean[:limit].rstrip() + "..."
def _on_rephrase_select(self, _evt=None):
if self._suppress_rephrase_select:
return
sel = None
try:
sel = self.rephrase_tree.selection()
except Exception:
sel = None
if not sel:
return
key = sel[0]
variant = None
for v in self.rephrase_variants:
if (v.get("key") or "") == key:
variant = v
break
if not variant:
return
text = variant.get("text", "")
self.rephrase_selected_key = key
current = self.notes.get("1.0", tk.END).strip()
if current == text.strip():
return
self.notes.delete("1.0", tk.END)
self.notes.insert("1.0", text)
try:
self.notes.see("1.0")
except Exception:
pass
self._set_dirty(True)
def on_extend(self):
model = (self.ollama_model.get() or "").strip()
if not model or model == "Select model...":
self._ui(lambda: messagebox.showinfo("Select model", "Please select a model first."))
return
note = self.notes.get("1.0", tk.END).strip()
if not note:
self._ui(lambda: messagebox.showinfo("No notes", "Add some notes to extend."))
return
self._set_status("Extending…")
threading.Thread(target=self._extend_thread, args=(note, model), daemon=True).start()
def _extend_thread(self, note: str, model: str):
try:
client = OllamaClient(host=self.ollama_host.get())
prompt = EXTEND_PROMPT.replace("{USER_NOTE}", note)
raw = client.generate(model=model, prompt=prompt)
text = sanitize_llm_text_simple(raw)
if not text.strip():
raise RuntimeError("Empty response from model")
self._set_status("Extension ready")
def _apply():
existing = self.notes.get("1.0", tk.END)
if not existing.strip():
self.notes.delete("1.0", tk.END)
self.notes.insert("1.0", text)
else:
prefix = "\n" if existing.endswith("\n") else "\n"
self.notes.insert(tk.END, prefix + text)
try:
self.notes.see(tk.END)
except Exception:
pass
self._set_dirty(True)
self._ui(_apply)
except Exception as e:
self._set_status("Extend failed")
msg = f"Extend failed:\n{e}"
self._ui(lambda m=msg: messagebox.showerror("Error", m))
# --- Concept generation
def on_generate(self):
# Ensure a model is selected
model = (self.ollama_model.get() or "").strip()
if not model or model == "Select model...":
self._ui(lambda: messagebox.showinfo("Select model", "Please select a model first."))
return
if not self.files and not self.notes.get("1.0", tk.END).strip():
self._ui(lambda: messagebox.showinfo("Nothing to do", "Add files or write some notes first."))
return
threading.Thread(target=self._generate_concept_thread, daemon=True).start()
def _generate_concept_thread(self):
try:
self._set_status("Preparing knowledge base…")
records = self.ensure_and_load_kb_for_current()
notes = self.notes.get("1.0", tk.END).strip()
kb = build_kb_string(records)
assets = [p for p in self.files if self.include_map.get(str(p), True)]
assets_str = "\n".join(f"- {Path(p).name}" for p in assets) or "(none)"
prompt = (
PROMPT_TEMPLATE
.replace("{NOTES}", notes or "(none)")
.replace("{KB}", kb or "(empty)")
.replace("{ASSETS}", assets_str)
)
self._set_status("Querying Ollama…")
client = OllamaClient(host=self.ollama_host.get())
concept_md = client.generate(model=self.ollama_model.get(), prompt=prompt)
concept_md = sanitize_llm_text_simple(concept_md)
# Extract title/description via second structured call
title, desc = self._extract_title_desc(concept_md, client)
if not desc:
desc = ""
# Ensure top heading matches title
concept_md = md_heading_replace_or_insert(concept_md, title)
if not concept_md.strip():
raise RuntimeError("Empty response from model")
self._set_status("Concept generated")
# schedule UI update (concept, title, desc)
def _apply():
self.concept.delete("1.0", tk.END)
self.concept.insert("1.0", concept_md)
self.title_var.set(title)
self.desc_var.set(strip_wrapping_quotes(desc)[:120])
self.update_push_state()
self._set_dirty(True)
self._ui(_apply)
except Exception as e:
self._set_status("Generation failed")
msg = f"Failed to generate concept:\n{e}"
self._ui(lambda m=msg: messagebox.showerror("Error", m))
return
# After a successful apply, attempt autosave shortly after
try:
self.after(150, self._autosave_after_generation)
except Exception:
pass
# --- Image prompt generation
def _get_active_idea_text(self) -> str:
concept_text = self.concept.get("1.0", tk.END).strip()
notes_text = self.notes.get("1.0", tk.END).strip()
title = (self.title_var.get() or "").strip()
desc = (self.desc_var.get() or "").strip()
parts: List[str] = []
if title:
parts.append(f"Title: {title}")
if desc:
parts.append(f"Description: {desc}")
if concept_text:
parts.append("Concept:\n" + concept_text)
elif notes_text:
parts.append("Notes:\n" + notes_text)
return "\n\n".join([p for p in parts if p]).strip()
def _set_image_prompt_text(self, text: str):
if not hasattr(self, "image_prompt_text"):
return
try:
self.image_prompt_text.configure(state=tk.NORMAL)
self.image_prompt_text.delete("1.0", tk.END)
if text:
self.image_prompt_text.insert("1.0", text.strip())
self.image_prompt_text.configure(state=tk.DISABLED)
except Exception:
pass
def _set_image_prompt_loading(self, flag: bool, *, interim_text: Optional[str] = None):
try:
if flag:
self.image_prompt_btn.configure(state=tk.DISABLED, text="Generating…")
else:
self.image_prompt_btn.configure(state=tk.NORMAL, text="Generate image prompt")
except Exception:
pass
if flag and interim_text:
self._set_image_prompt_text(interim_text)
def _reset_image_prompt_area(self):
self._set_image_prompt_loading(False)
self._set_image_prompt_text("Generated image prompt will appear here.")
def on_generate_image_prompt(self):
model = (self.ollama_model.get() or "").strip()
if not model or model == "Select model...":
self._ui(lambda: messagebox.showinfo("Select model", "Please select a model first."))
return
idea_text = self._get_active_idea_text()
if not idea_text:
self._ui(lambda: messagebox.showinfo("No idea", "Add concept text or notes first."))
return
self._set_status("Generating image prompt…")
self._set_image_prompt_loading(True, interim_text="Generating image prompt…")
threading.Thread(target=self._generate_image_prompt_thread, args=(idea_text, model), daemon=True).start()
def _generate_image_prompt_thread(self, idea_text: str, model: str):
try:
client = OllamaClient(host=self.ollama_host.get())
prompt_text = generate_image_prompt_for_idea(idea_text, client=client, model=model)
if not prompt_text.strip():
raise RuntimeError("Empty response from model")
self._set_status("Image prompt ready")
self._ui(lambda p=prompt_text: self._apply_image_prompt_result(p))
except Exception as e:
self._set_status("Image prompt failed")
msg = f"Failed to generate image prompt:\n{e}"
self._ui(lambda m=msg: messagebox.showerror("Error", m))
finally:
self._ui(lambda: self._set_image_prompt_loading(False))
def _apply_image_prompt_result(self, prompt_text: str):
self._set_image_prompt_text(prompt_text.strip())
try:
self.image_prompt_text.see("1.0")
except Exception:
pass
# --- Rebuild KB only
def on_rebuild_kb(self):
threading.Thread(target=self._rebuild_kb_thread, daemon=True).start()
def _rebuild_kb_thread(self):
try:
self.ensure_and_load_kb_for_current()
except Exception as e:
self._set_status("KB refresh failed")
msg = f"KB refresh failed:\n{e}"
self._ui(lambda m=msg: messagebox.showerror("Error", m))
# --- Sessions: New / Open / Save
def on_new_session(self):
# Prompt to save if there are unsaved changes
if not self._maybe_save_if_dirty():
return
try:
self.on_clear_all()
self.title_var.set("")
self.desc_var.set("")
self.notes.delete("1.0", tk.END)
self.concept.delete("1.0", tk.END)
self._apply_rephrase_results([], mark_dirty=False)
self._reset_image_prompt_area()
self._set_status("New session")
self._set_dirty(False)
# Reset last-saved snapshot to clean baseline
self._last_saved = self._snapshot_session_state()
self.update_push_state()
except Exception:
pass
def on_save_session(self):
ok = self._save_session(confirm_overwrite=True, autosave=False)
if ok:
self._set_dirty(False)
def on_open_session(self):
# Guard unsaved changes
if not self._maybe_save_if_dirty():
return
sessions = self._load_all_sessions()
if not sessions:
self._ui(lambda: messagebox.showinfo("No sessions", "No sessions found. Save one first."))
return
# Simple chooser dialog
dlg = tk.Toplevel(self)
dlg.title("Open Session")
dlg.geometry("600x360")
frm = ttk.Frame(dlg)
frm.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)
cols = ("title", "saved")
tv = ttk.Treeview(frm, columns=cols, show="headings")
tv.heading("title", text="Title")
tv.heading("saved", text="Saved")
tv.column("title", width=360)
tv.column("saved", width=180)
vs = ttk.Scrollbar(frm, orient=tk.VERTICAL, command=tv.yview)
tv.configure(yscrollcommand=vs.set)
tv.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
vs.pack(side=tk.RIGHT, fill=tk.Y)
def fmt_ts(ts: int) -> str:
try:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ts))
except Exception:
return str(ts)
def refresh(selected_saved_at: Optional[int] = None):
nonlocal sessions
tv.delete(*tv.get_children())
sessions = self._load_all_sessions()
for idx, s in enumerate(sessions):
tv.insert('', tk.END, iid=str(idx), values=(s.get("title",""), fmt_ts(int(s.get("saved_at", 0)))))
# Try to reselect
if selected_saved_at is not None:
for iid in tv.get_children(''):
i = int(iid)
try:
if int(sessions[i].get("saved_at", -1)) == int(selected_saved_at):
tv.selection_set(iid)
tv.see(iid)
break
except Exception:
pass
refresh()
sel_idx = {"i": None}
def _choose_and_close():
sel = tv.selection()
if not sel:
return
try:
sel_idx["i"] = int(sel[0])
except Exception:
sel_idx["i"] = None
dlg.destroy()
tv.bind('<Double-1>', lambda _e: _choose_and_close())
btns = ttk.Frame(dlg)
btns.pack(fill=tk.X, padx=8, pady=8)
def _rename():
sel = tv.selection()
if not sel:
return
i = int(sel[0]); s = sessions[i]
# Prompt for new title
ttl = tk.Toplevel(dlg); ttl.title("Rename Session"); ttl.geometry("420x120")
f = ttk.Frame(ttl); f.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)
ttk.Label(f, text="New title:").pack(anchor=tk.W)
var = tk.StringVar(value=s.get("title",""))
ent = ttk.Entry(f, textvariable=var)
ent.pack(fill=tk.X)
ent.focus_set()
btnf = ttk.Frame(f); btnf.pack(fill=tk.X, pady=(8,0))
def _ok():
new_title = (var.get() or "").strip()
if not new_title:
ttl.destroy(); return
# If another session with new_title exists (not this one), confirm overwrite
exists = any((x.get("title") == new_title and int(x.get("saved_at",-1)) != int(s.get("saved_at",-1))) for x in sessions)
if exists:
if not messagebox.askyesno("Overwrite", f"A session titled '{new_title}' exists. Overwrite it?"):
return
# Rewrite sessions: remove all with new_title (except current); then update current title
all_s = self._load_all_sessions()
cur_sa = int(s.get("saved_at", -1))
filtered = [x for x in all_s if x.get("title") != new_title and int(x.get("saved_at",-1)) != cur_sa]
s2 = dict(s); s2["title"] = new_title
filtered.append(s2)
self._write_all_sessions(filtered)
ttl.destroy()
refresh(selected_saved_at=cur_sa)
ttk.Button(btnf, text="OK", command=_ok).pack(side=tk.RIGHT)
ttk.Button(btnf, text="Cancel", command=ttl.destroy).pack(side=tk.RIGHT, padx=(0,8))
ttl.transient(dlg); ttl.grab_set(); self.wait_window(ttl)
def _delete():
sel = tv.selection()
if not sel:
return
i = int(sel[0]); s = sessions[i]
if not messagebox.askyesno("Delete Session", f"Delete '{s.get('title','')}'?"):
return
all_s = self._load_all_sessions()
sa = int(s.get("saved_at", -1))
kept = [x for x in all_s if int(x.get("saved_at", -1)) != sa]
self._write_all_sessions(kept)
refresh()
ttk.Button(btns, text="Delete", command=_delete).pack(side=tk.LEFT)
ttk.Button(btns, text="Rename", command=_rename).pack(side=tk.LEFT, padx=(6,0))
ttk.Button(btns, text="Open", command=_choose_and_close).pack(side=tk.RIGHT)
ttk.Button(btns, text="Cancel", command=dlg.destroy).pack(side=tk.RIGHT, padx=(0,8))
dlg.transient(self)
dlg.grab_set()
self.wait_window(dlg)
i = sel_idx.get("i")
if i is None:
return
s = sessions[i]
# Apply session
try:
self.on_clear_all()
self.title_var.set(s.get("title", ""))
self.desc_var.set(s.get("description", ""))
self.notes.delete("1.0", tk.END)
self.notes.insert("1.0", s.get("notes", ""))
self.concept.delete("1.0", tk.END)
self.concept.insert("1.0", s.get("concept", ""))
self._reset_image_prompt_area()
rephrases_raw = s.get("rephrase_variants") or []
cleaned_rephrases: List[Dict[str, str]] = []
for v in rephrases_raw:
if isinstance(v, dict):
cleaned_rephrases.append({
"key": str(v.get("key") or ""),
"label": str(v.get("label") or ""),
"text": str(v.get("text") or ""),
})
self._apply_rephrase_results(cleaned_rephrases, select_key=s.get("rephrase_selected_key"), mark_dirty=False)
files = s.get("files") or []
# Re-add files; prefer original path, fallback to symlink by hash
resolved: List[Path] = []
for f in files:
p = Path(f.get("path") or "")
h = str(f.get("file_hash") or "")
if not p.exists() and h:
# try to locate symlink with hash prefix
try:
for q in self._files_dir.glob(f"{h}__*"):
p = q
break
except Exception:
pass
if p.exists():
resolved.append(p)
self._add_paths(resolved)
# Restore include flags
for f in files:
p = str(f.get("path") or "")
inc = bool(f.get("include", True))
if p:
self.include_map[p] = inc
# Reflect include flags in UI rows
for item in self.tree.get_children(''):
vals = list(self.tree.item(item, 'values'))
if vals and len(vals) >= 5:
path = str(vals[1])
vals[4] = '' if self.include_map.get(path, True) else ''
self.tree.item(item, values=tuple(vals))
self.update_push_state()
self._set_status("Session loaded")
self._set_dirty(False)
# Save snapshot of the loaded state for accurate dirty detection
self._last_saved = self._snapshot_session_state()
except Exception as e:
self._ui(lambda m=f"Failed to open session:\n{e}": messagebox.showerror("Error", m))
# (Description generation now runs automatically after concept generation.)
# --- Push to Repo
def update_push_state(self):
title_ok = bool(self.title_var.get().strip())
desc_ok = bool(self.desc_var.get().strip())
concept_ok = bool(self.concept.get("1.0", tk.END).strip())
state = tk.NORMAL if (title_ok and desc_ok and concept_ok) else tk.DISABLED
try:
self.push_btn.configure(state=state)
except Exception:
pass
def _on_concept_modified(self, _evt=None):
try:
if self.concept.edit_modified():
self.update_push_state()
self._set_dirty(True)
self.concept.edit_modified(False)
except Exception:
pass
def _on_notes_modified(self, _evt=None):
try:
if self.notes.edit_modified():
self._set_dirty(True)
self.notes.edit_modified(False)
except Exception:
pass
def _open_path_default(self, path: Path) -> bool:
try:
if sys.platform.startswith('darwin'):
subprocess.Popen(['open', str(path)])
return True
elif os.name == 'nt':
os.startfile(str(path)) # type: ignore[attr-defined]
return True
else:
subprocess.Popen(['xdg-open', str(path)])
return True
except Exception:
return False
def on_preview(self):
"""Generate a PDF preview using the same conversion flow as Push, but in a temp folder.
- Writes a temporary README.md and copies selected assets into .idea-hole/preview/<slug>-preview
- Exports PDF into that same folder and opens it with the OS default viewer
"""
concept_text = self.concept.get("1.0", tk.END).strip()
if not concept_text:
self._ui(lambda: messagebox.showinfo("No concept", "Please generate or paste the concept text to preview."))
return
threading.Thread(target=self._preview_thread, daemon=True).start()
def _preview_thread(self):
try:
title = (self.title_var.get() or "").strip()
slug = self._slug(title or "preview")
# Prepare preview workspace
base = Path.cwd() / ".idea-hole" / "preview" / f"{slug}-preview"
try:
if base.exists():
shutil.rmtree(base)
except Exception:
pass
base.mkdir(parents=True, exist_ok=True)
# Write Markdown
md_path = base / "README.md"
md_text = self.concept.get("1.0", tk.END).strip()
md_path.write_text(md_text, encoding="utf-8")
# Copy selected assets into preview workspace
assets = [p for p in self.files if self.include_map.get(str(p), True)]
for src in assets:
try:
dst = base / src.name
if dst.name.lower() in {"readme.md", f"{slug}-concept.pdf".lower(), f"{slug}-preview.pdf".lower()}:
dst = base / f"asset-{src.name}"
shutil.copy2(src, dst)
except Exception:
pass
# Export PDF in the same workspace folder
pdf_path = base / f"{slug}-preview.pdf"
self._set_status("Exporting PDF preview…")
ok_pdf = self._convert_markdown_to_pdf(md_path, pdf_path)
if not ok_pdf:
logs_dir = Path.cwd() / ".idea-hole" / "logs"
log_path = logs_dir / f"pdf_export_{base.name}.log"
def _show_pdf_error():
msg = "PDF preview failed."
if log_path.exists():
try:
data = log_path.read_text(encoding='utf-8')
snippet = data[-2000:] if len(data) > 2000 else data
rel = Path('.idea-hole') / 'logs' / log_path.name
msg = f"PDF preview failed. See {rel} for details.\n\nLast output:\n{snippet}"
except Exception:
pass
messagebox.showerror("PDF Preview", msg)
self._ui(_show_pdf_error)
self._set_status("Preview failed")
return
# Try to open the generated PDF
opened = self._open_path_default(pdf_path)
if not opened:
self._ui(lambda: messagebox.showinfo("Preview ready", f"Preview PDF saved to:\n{pdf_path}"))
self._set_status("Preview ready")
except Exception as e:
self._set_status("Preview failed")
self._ui(lambda m=f"Failed to preview PDF:\n{e}": messagebox.showerror("Error", m))
def on_push(self):
title = self.title_var.get().strip()
desc = self.desc_var.get().strip()
if not title or not desc:
self._ui(lambda: messagebox.showinfo("Missing fields", "Please fill Title and Description."))
return
if not self.concept.get("1.0", tk.END).strip():
self._ui(lambda: messagebox.showinfo("No concept", "Please generate or paste the concept text."))
return
threading.Thread(target=self._push_thread, daemon=True).start()
# --- Git helpers
def _run_git(self, repo_dir: Path, *args: str) -> subprocess.CompletedProcess:
return subprocess.run(["git", *args], cwd=str(repo_dir), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
def _ensure_repo_initialized(self, repo_dir: Path):
repo_dir.mkdir(parents=True, exist_ok=True)
if not (repo_dir / ".git").exists():
_ = subprocess.run(["git", "init"], cwd=str(repo_dir), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
def _ensure_remote_origin(self, repo_dir: Path, remote_url: str):
if not remote_url:
return
res = self._run_git(repo_dir, "remote", "get-url", "origin")
if res.returncode == 0:
current = (res.stdout or "").strip()
if current != remote_url:
_ = self._run_git(repo_dir, "remote", "set-url", "origin", remote_url)
else:
_ = self._run_git(repo_dir, "remote", "add", "origin", remote_url)
def _ensure_branch_master(self, repo_dir: Path):
"""Ensure the default branch is master even on unborn HEAD."""
# Try to read symbolic HEAD (works even if unborn)
res = self._run_git(repo_dir, "symbolic-ref", "-q", "HEAD")
headref = (res.stdout or "").strip() if res.returncode == 0 else ""
if not headref:
# Unborn or detached; set HEAD to refs/heads/master
self._run_git(repo_dir, "symbolic-ref", "HEAD", "refs/heads/master")
return
if headref.endswith("/master"):
return
# Otherwise rename current branch to master
self._run_git(repo_dir, "branch", "-M", "master")
def _slug(self, s: str) -> str:
s = re.sub(r"[\s]+", "-", s.strip())
s = re.sub(r"[^a-zA-Z0-9._-]", "-", s)
return re.sub(r"-+", "-", s).strip("-_")
# --- Concepts index helpers
def _build_slug_map_from_sessions(self) -> Dict[str, Dict[str, str]]:
"""Return mapping: slug -> {title, description}; prefer most recent saved_at per slug."""
entries = self._load_all_sessions()
best: Dict[str, Dict[str, str]] = {}
best_ts: Dict[str, int] = {}
for e in entries:
title = (e.get("title") or "").strip()
if not title:
continue
slug = self._slug(title)
ts = 0
try:
ts = int(e.get("saved_at") or 0)
except Exception:
ts = 0
if slug not in best or ts >= best_ts.get(slug, 0):
best[slug] = {"title": title, "description": (e.get("description") or "").strip()}
best_ts[slug] = ts
return best
def _write_concepts_index(self, repo_dir: Path):
try:
# Map available descriptions from sessions.jsonl by slug
slug_map = self._build_slug_map_from_sessions()
items = []
seen: Set[str] = set()
for child in sorted(repo_dir.iterdir(), key=lambda p: p.name.lower()):
if not child.is_dir():
continue
name = child.name
if name.startswith('.') or name in {".git", "node_modules"}:
continue
slug = name
if slug in seen:
continue
seen.add(slug)
title = slug_map.get(slug, {}).get("title") or re.sub(r"[-_]+", " ", slug).strip().title()
desc = slug_map.get(slug, {}).get("description") or ""
items.append((slug, title, desc))
intro = (
"This folder contains a library of project concepts created with the Idea → Concept tool. "
"Each entry links to its folder with the original concept README and related assets."
)
lines: List[str] = []
lines.append("# Concepts Index")
lines.append("")
lines.append(intro)
lines.append("")
for slug, title, desc in items:
if desc:
lines.append(f"- [{title}](./{slug}/) — {desc}")
else:
lines.append(f"- [{title}](./{slug}/)")
(repo_dir / "README.md").write_text("\n".join(lines).rstrip() + "\n", encoding="utf-8")
except Exception:
# Non-fatal; skip index update on error
pass
def _convert_markdown_to_pdf(self, md_file: Path, out_pdf: Path) -> bool:
"""Single-path conversion: pandoc + tectonic with image compatibility.
- Converts unsupported image formats to PNG in a temp folder
- Uses a temporary Markdown copy for PDF only (original README.md untouched)
- Sans-serif font, 20mm margins
- Logs to .idea-hole/logs (never the concepts repo)
"""
concept_dir = out_pdf.parent
concept_dir.mkdir(parents=True, exist_ok=True)
logs_dir = Path.cwd() / ".idea-hole" / "logs"
logs_dir.mkdir(parents=True, exist_ok=True)
log_path = logs_dir / f"pdf_export_{concept_dir.name}.log"
def _resolve(name: str) -> Optional[str]:
for base in [None, "/opt/homebrew/bin", "/usr/local/bin", "/usr/bin", "/bin"]:
p = shutil.which(name) if base is None else os.path.join(base, name)
if p and os.path.exists(p):
return p
return None
pandoc = _resolve("pandoc")
tectonic = _resolve("tectonic")
lines: List[str] = []
lines.append(f"PATH={os.environ.get('PATH','')}")
lines.append(f"md_file={md_file}")
lines.append(f"resolved pandoc={pandoc}")
lines.append(f"resolved tectonic={tectonic}")
if not pandoc or not tectonic:
lines.append("Missing required tools: pandoc and/or tectonic.")
try:
log_path.write_text("\n".join(lines), encoding="utf-8")
except Exception:
pass
return False
# Build temp workspace for PDF (images + markdown copy)
tmp_base = Path.cwd() / ".idea-hole" / "tmp_pdf" / concept_dir.name
try:
if tmp_base.exists():
shutil.rmtree(tmp_base)
except Exception:
pass
tmp_base.mkdir(parents=True, exist_ok=True)
# Rewrite Markdown image refs to PDF-compatible formats
try:
text = md_file.read_text(encoding="utf-8")
except Exception as e:
lines.append(f"read error: {e}")
try: log_path.write_text("\n".join(lines), encoding="utf-8")
except Exception: pass
return False
# Regex for Markdown images ![alt](path "title")
img_rgx = re.compile(r"!\[[^\]]*\]\(([^\s)]+)(?:\s+\"[^\"]*\")?\)")
allowed_ext = {".png", ".jpg", ".jpeg", ".pdf", ".eps"}
def ensure_image_available(src: str) -> str:
"""Copy or convert image into tmp_base and return the tmp filename to use in Markdown."""
p = Path(src)
if not p.is_absolute():
p = (concept_dir / p).resolve()
if not p.exists():
# Fallback: if src had subdirs, try by basename in concept_dir (assets were copied flat)
alt = (concept_dir / Path(src).name).resolve()
if alt.exists():
p = alt
else:
lines.append(f"missing image: {src}")
return src # leave as-is; LaTeX will likely show caption only
ext = p.suffix.lower()
# Destination filename in tmp
if ext in allowed_ext:
out_name = p.name
out_path = tmp_base / out_name
try:
if not out_path.exists():
shutil.copy2(str(p), str(out_path))
return out_name
except Exception as e:
lines.append(f"copy fail: {src} -> {out_name} ({e})")
return src
# Special-case SVG: try CairoSVG or rsvg-convert/ImageMagick
if ext == ".svg":
out_name = p.stem + ".png"
out_path = tmp_base / out_name
# Try CairoSVG (python lib)
try:
from cairosvg import svg2png # type: ignore
svg2png(url=str(p), write_to=str(out_path))
return out_name
except Exception as e_svg_py:
lines.append(f"cairosvg unavailable or failed: {e_svg_py}")
# Try rsvg-convert CLI
try:
tool = shutil.which("rsvg-convert")
if tool:
res = subprocess.run([tool, "-f", "png", "-o", str(out_path), str(p)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
if res.returncode == 0 and out_path.exists():
return out_name
lines.append(f"rsvg-convert failed: exit {res.returncode}, {res.stdout}")
except Exception as e_svg_cli:
lines.append(f"rsvg-convert error: {e_svg_cli}")
# Try ImageMagick convert
try:
tool = shutil.which("magick") or shutil.which("convert")
if tool:
res = subprocess.run([tool, str(p), str(out_path)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
if res.returncode == 0 and out_path.exists():
return out_name
lines.append(f"imagemagick failed: exit {res.returncode}, {res.stdout}")
except Exception as e_im:
lines.append(f"imagemagick error: {e_im}")
# fallthrough to Pillow/general path
# Try to convert unsupported formats to PNG via Pillow
try:
from PIL import Image
img = Image.open(str(p))
try:
img.seek(0)
except Exception:
pass
out_name = p.stem + ".png"
out_path = tmp_base / out_name
img.convert("RGBA" if img.mode in ("P", "LA") else "RGB").save(str(out_path), format="PNG")
return out_name
except Exception as e:
lines.append(f"convert fail: {src} -> png ({e})")
# Last resort: copy original and hope engine supports it
out_name = p.name
out_path = tmp_base / out_name
try:
shutil.copy2(str(p), str(out_path))
return out_name
except Exception as e2:
lines.append(f"final copy fail: {src} ({e2})")
return src
# Build a modified markdown string with tmp-local image paths
def _repl(m: re.Match) -> str:
orig = m.group(0)
path = m.group(1)
rep = ensure_image_available(path)
# Replace the path with the tmp filename (no directories)
return orig.replace(path, rep)
mod_text = img_rgx.sub(_repl, text)
# Preserve multiple consecutive blank lines by inserting raw TeX vspace.
# Pandoc collapses multiple blank lines into a single paragraph break in LaTeX.
# We translate extra blank lines into additional vertical space so visual spacing matches editing.
def _preserve_extra_blank_lines(s: str) -> str:
s = s.replace("\r\n", "\n").replace("\r", "\n")
lines_in = s.split("\n")
out_lines: List[str] = []
in_fence = False
blank_run = 0
for ln in lines_in:
stripped = ln.lstrip()
# Toggle code fence state on ``` or ~~~ lines
if stripped.startswith("```") or stripped.startswith("~~~"):
# flush any pending blanks before fence delimiter
if blank_run > 0:
out_lines.append("")
for _ in range(blank_run - 1):
out_lines.append("\\vspace{1em}")
blank_run = 0
out_lines.append(ln)
in_fence = not in_fence
continue
if in_fence:
# pass through exactly
if blank_run > 0:
out_lines.append("")
for _ in range(blank_run - 1):
out_lines.append("\\vspace{1em}")
blank_run = 0
out_lines.append(ln)
continue
# Outside fences: track blank runs
if stripped == "":
blank_run += 1
continue
# Non-blank line: flush blanks first
if blank_run > 0:
out_lines.append("")
for _ in range(blank_run - 1):
out_lines.append("\\vspace{1em}")
blank_run = 0
out_lines.append(ln)
# flush at EOF
if blank_run > 0:
out_lines.append("")
for _ in range(blank_run - 1):
out_lines.append("\\vspace{1em}")
return "\n".join(out_lines)
mod_text = _preserve_extra_blank_lines(mod_text)
tmp_md = tmp_base / "README_pdf.md"
tmp_md.write_text(mod_text, encoding="utf-8")
# Run pandoc (tectonic engine) in tmp_base with resource-path pointing to concept_dir and tmp_base
cmd = [
pandoc,
str(tmp_md),
"-f", "markdown+hard_line_breaks+raw_tex",
"-s",
"--pdf-engine=tectonic",
"-V", "mainfont=Helvetica",
"-V", "monofont=Menlo",
"-V", "geometry:margin=20mm",
"-V", "fontsize=11pt",
"--resource-path", f"{str(tmp_base)}:{str(concept_dir)}",
"-o", str(out_pdf),
]
res = subprocess.run(cmd, cwd=str(tmp_base), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
lines.append("$ " + " ".join(cmd))
lines.append(f"(exit {res.returncode})")
lines.append(res.stdout or "")
ok = (res.returncode == 0 and out_pdf.exists())
# Fallback: retry without hard_line_breaks if first attempt failed (older pandoc versions)
if not ok:
try:
cmd_fallback = [
pandoc,
str(tmp_md),
"-f", "markdown+hard_line_breaks",
"-s",
"--pdf-engine=tectonic",
"-V", "mainfont=Helvetica",
"-V", "monofont=Menlo",
"-V", "geometry:margin=20mm",
"-V", "fontsize=11pt",
"--resource-path", f"{str(tmp_base)}:{str(concept_dir)}",
"-o", str(out_pdf),
]
res2 = subprocess.run(cmd_fallback, cwd=str(tmp_base), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
lines.append("$ " + " ".join(cmd_fallback))
lines.append(f"(exit {res2.returncode})")
lines.append(res2.stdout or "")
ok = (res2.returncode == 0 and out_pdf.exists())
except Exception as _e_fallback:
lines.append(f"fallback error: {_e_fallback}")
if not ok:
try:
log_path.write_text("\n".join(lines), encoding="utf-8")
except Exception:
pass
# Cleanup temp workspace
try:
shutil.rmtree(tmp_base)
except Exception:
pass
return ok
def _push_thread(self):
try:
self._set_status("Preparing repo…")
repo_dir = Path.cwd() / "concepts"
self._ensure_repo_initialized(repo_dir)
# Create concept folder and files
title = self.title_var.get().strip()
slug = self._slug(title)
concept_dir = repo_dir / slug
concept_dir.mkdir(parents=True, exist_ok=True)
md_path = concept_dir / "README.md"
md_text = self.concept.get("1.0", tk.END).strip()
md_path.write_text(md_text, encoding="utf-8")
# (No meta.json written; index derives data from sessions.jsonl)
# Copy selected assets into concept folder BEFORE PDF generation
assets = [p for p in self.files if self.include_map.get(str(p), True)]
for src in assets:
try:
dst = concept_dir / src.name
if dst.name.lower() in {"readme.md", f"{slug}-concept.pdf".lower()}:
# avoid clobbering the generated files
dst = concept_dir / f"asset-{src.name}"
shutil.copy2(src, dst)
except Exception:
# ignore individual copy failures but continue
pass
# Now export PDF (images are present in concept_dir)
pdf_path = concept_dir / f"{slug}-concept.pdf"
self._set_status("Exporting PDF…")
ok_pdf = self._convert_markdown_to_pdf(md_path, pdf_path)
if not ok_pdf:
logs_dir = Path.cwd() / ".idea-hole" / "logs"
log_path = logs_dir / f"pdf_export_{concept_dir.name}.log"
def _show_pdf_error():
msg = "PDF export failed."
if log_path.exists():
try:
data = log_path.read_text(encoding='utf-8')
snippet = data[-2000:] if len(data) > 2000 else data
rel = Path('.idea-hole') / 'logs' / log_path.name
msg = f"PDF export failed. See {rel} for details.\n\nLast output:\n{snippet}"
except Exception:
pass
messagebox.showerror("PDF Export", msg)
self._ui(_show_pdf_error)
# Update concepts root README index before committing
try:
self._write_concepts_index(repo_dir)
except Exception:
pass
# Git add/commit
self._set_status("Committing…")
add_res = self._run_git(repo_dir, "add", ".")
if add_res.returncode != 0:
raise RuntimeError(add_res.stdout)
commit_msg = f"{title} - {self.desc_var.get().strip()}"
commit_res = self._run_git(repo_dir, "commit", "-m", commit_msg)
if commit_res.returncode != 0:
# If nothing to commit
if "nothing to commit" not in (commit_res.stdout or "").lower():
raise RuntimeError(commit_res.stdout)
# Ensure master branch and remote
remote = (self.git_remote_url.get() or "").strip()
self._ensure_branch_master(repo_dir)
if remote:
self._set_status("Pushing…")
self._ensure_remote_origin(repo_dir, remote)
push_res = self._run_git(repo_dir, "push", "-u", "origin", "master")
if push_res.returncode != 0:
raise RuntimeError(push_res.stdout)
self._set_status("Pushed to remote")
else:
self._set_status("Committed locally (no remote set)")
self._ui(lambda: messagebox.showinfo("Success", "Concept saved and repository updated."))
except Exception as e:
self._set_status("Push failed")
msg = f"Failed to push to repo:\n{e}"
self._ui(lambda m=msg: messagebox.showerror("Error", m))
# --- Status helper
def _set_status(self, s: str):
self._ui(lambda: (self.status.configure(text=s), self.status.update_idletasks()))
def _ui(self, fn):
try:
self.after(0, fn)
except Exception:
# best-effort fallback
try:
fn()
except Exception:
pass
# --- Tk helpers
def _auto_hide_scrollbar(self, sb: ttk.Scrollbar, lo, hi):
"""Auto-hide a ttk.Scrollbar when content fits; show when it overflows.
Works with both pack and grid-managed scrollbars.
"""
try:
flo, fhi = float(lo), float(hi)
except Exception:
try:
flo, fhi = float(str(lo)), float(str(hi))
except Exception:
flo, fhi = 0.0, 1.0
# Update the scrollbar range
try:
sb.set(flo, fhi)
except Exception:
pass
needs = not (flo <= 0.0 and fhi >= 1.0)
try:
mgr = sb.winfo_manager()
except Exception:
mgr = ''
try:
if needs:
# show
if mgr == 'grid':
if not sb.winfo_ismapped():
sb.grid()
elif mgr == 'pack':
try:
sb.pack_info()
except Exception:
sb.pack(side=tk.RIGHT, fill=tk.Y)
else:
# default to pack
sb.pack(side=tk.RIGHT, fill=tk.Y)
else:
# hide
if mgr == 'grid':
sb.grid_remove()
elif mgr == 'pack':
sb.pack_forget()
except Exception:
pass
# --- Ollama model listing
def _list_models(self) -> List[str]:
try:
res = subprocess.run(["ollama", "list"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, timeout=8)
if res.returncode != 0:
return []
lines = [ln.strip() for ln in (res.stdout or "").splitlines()]
out: List[str] = []
for ln in lines:
if not ln or ln.lower().startswith("name"):
continue
# first column is model name
name = ln.split()[0]
if name and name not in out:
out.append(name)
return out
except Exception:
return []
# --- Dirty snapshot helpers
def _snapshot_session_state(self) -> Dict:
try:
files_list = [{
"path": str(p),
"include": bool(self.include_map.get(str(p), True))
} for p in self.files]
# stable order
files_list.sort(key=lambda x: (x["path"], not x["include"]))
variants: List[Dict[str, str]] = []
for v in (self.rephrase_variants or []):
if not isinstance(v, dict):
continue
variants.append({
"key": str(v.get("key") or ""),
"label": str(v.get("label") or ""),
"text": str(v.get("text") or ""),
})
return {
"title": (self.title_var.get() or "").strip(),
"description": (self.desc_var.get() or "").strip(),
"notes": self.notes.get("1.0", tk.END).strip(),
"concept": self.concept.get("1.0", tk.END).strip(),
"files": files_list,
"rephrase_variants": variants,
"rephrase_selected_key": self.rephrase_selected_key,
}
except Exception:
return {"title":"","description":"","notes":"","concept":"","files":[],"rephrase_variants":[],"rephrase_selected_key":None}
def _is_effectively_dirty(self) -> bool:
try:
if not getattr(self, "_dirty", False):
return False
now = self._snapshot_session_state()
last = getattr(self, "_last_saved", None)
if last is None:
return any([now.get("title"), now.get("description"), now.get("notes"), now.get("concept"), now.get("files")])
return now != last
except Exception:
# On any error, be conservative and assume not dirty to avoid noisy prompts
return False
def _get_model_values(self) -> List[str]:
models = self._list_models()
return ["Select model..."] + models
# --- Settings persistence
def _config_path(self) -> Path:
base = Path.cwd() / ".idea-hole"
base.mkdir(parents=True, exist_ok=True)
return base / "settings.json"
def _load_settings(self):
try:
p = self._config_path()
if p.exists():
obj = json.loads(p.read_text(encoding='utf-8'))
if isinstance(obj, dict):
if obj.get('ollama_host'):
self.ollama_host.set(obj['ollama_host'])
if obj.get('ollama_model'):
self.ollama_model.set(obj['ollama_model'])
if obj.get('git_remote_url'):
self.git_remote_url.set(obj['git_remote_url'])
if obj.get('searx_url'):
self.searx_url.set(obj['searx_url'])
if obj.get('pandoc_path'):
# Only set if attribute exists
try: self.pandoc_path.set(obj['pandoc_path'])
except Exception: pass
if obj.get('wkhtmltopdf_path'):
try: self.wkhtmltopdf_path.set(obj['wkhtmltopdf_path'])
except Exception: pass
except Exception:
pass
# --- Dirty/session helpers
def _set_dirty(self, flag: bool = True):
try:
self._dirty = bool(flag)
except Exception:
self._dirty = True
def _load_all_sessions(self) -> List[Dict]:
entries: List[Dict] = []
try:
with self._sessions_file.open("r", encoding="utf-8") as fh:
for line in fh:
if not line.strip():
continue
try:
obj = json.loads(line)
if isinstance(obj, dict) and obj.get("title"):
entries.append(obj)
except Exception:
continue
except Exception:
pass
return entries
def _write_all_sessions(self, entries: List[Dict]):
tmp = self._sessions_file.with_suffix(".tmp")
try:
with tmp.open("w", encoding="utf-8") as fh:
for obj in entries:
fh.write(json.dumps(obj, ensure_ascii=False) + "\n")
tmp.replace(self._sessions_file)
except Exception:
# fallback non-atomic
with self._sessions_file.open("w", encoding="utf-8") as fh:
for obj in entries:
fh.write(json.dumps(obj, ensure_ascii=False) + "\n")
def _session_title_exists(self, title: str) -> bool:
t = (title or "").strip()
if not t:
return False
try:
with self._sessions_file.open("r", encoding="utf-8") as fh:
for line in fh:
if not line.strip():
continue
try:
obj = json.loads(line)
if isinstance(obj, dict) and (obj.get("title") or "").strip() == t:
return True
except Exception:
continue
except Exception:
pass
return False
def _build_session_payload(self) -> Dict:
# Ensure corpus coverage before saving
self._ensure_corpus_for_files(self.files, blocking=True)
files_meta = []
for p in self.files:
h = self.file_hashes.get(str(p)) or self._compute_file_hash(p)
self.file_hashes[str(p)] = h
files_meta.append({
"path": str(p),
"file_hash": h,
"include": bool(self.include_map.get(str(p), True)),
})
return {
"title": (self.title_var.get() or "").strip(),
"description": (self.desc_var.get() or "").strip(),
"notes": self.notes.get("1.0", tk.END).strip(),
"concept": self.concept.get("1.0", tk.END).strip(),
"files": files_meta,
"saved_at": int(time.time()),
"rephrase_variants": [{
"key": str(v.get("key") or ""),
"label": str(v.get("label") or ""),
"text": str(v.get("text") or ""),
} for v in (self.rephrase_variants or []) if isinstance(v, dict)],
"rephrase_selected_key": self.rephrase_selected_key,
}
def _save_session(self, *, confirm_overwrite: bool, autosave: bool) -> bool:
title = (self.title_var.get() or "").strip()
if not title:
self._ui(lambda: messagebox.showinfo("Title required", "Please enter a Title before saving the session."))
return False
exists = self._session_title_exists(title)
if exists and not confirm_overwrite and autosave:
# On autosave, do not overwrite; inform user once
self._ui(lambda: messagebox.showinfo("Autosave skipped", f"Autosave not performed: a session titled '{title}' already exists."))
return False
if exists and confirm_overwrite:
ok = messagebox.askyesno("Overwrite", f"A session titled '{title}' exists. Overwrite it?")
if not ok:
return False
# Prepare payload and rewrite file
payload = self._build_session_payload()
try:
all_entries = self._load_all_sessions()
if exists:
all_entries = [e for e in all_entries if (e.get("title") or "") != title]
all_entries.append(payload)
self._write_all_sessions(all_entries)
self._set_status("Session saved")
# Update last-saved snapshot for clean state detection
try:
self._last_saved = self._snapshot_session_state()
except Exception:
self._last_saved = {
"title": (self.title_var.get() or "").strip(),
"description": (self.desc_var.get() or "").strip(),
"notes": self.notes.get("1.0", tk.END).strip(),
"concept": self.concept.get("1.0", tk.END).strip(),
"files": [{"path": str(p), "include": bool(self.include_map.get(str(p), True))} for p in self.files],
"rephrase_variants": [{
"key": str(v.get("key") or ""),
"label": str(v.get("label") or ""),
"text": str(v.get("text") or ""),
} for v in (self.rephrase_variants or []) if isinstance(v, dict)],
"rephrase_selected_key": self.rephrase_selected_key,
}
return True
except Exception as e:
self._ui(lambda m=f"Failed to save session:\n{e}": messagebox.showerror("Error", m))
return False
# --- Prior-art search (SearXNG + embeddings)
def on_prior_art(self):
model = (self.ollama_model.get() or "").strip()
if not model or model == "Select model...":
self._ui(lambda: messagebox.showinfo("Select model", "Please select a model first."))
return
if not self.files and not self.notes.get("1.0", tk.END).strip():
self._ui(lambda: messagebox.showinfo("Nothing to search", "Add files or write some notes first."))
return
threading.Thread(target=self._prior_art_thread, daemon=True).start()
def _prior_art_thread(self):
try:
self._set_status("Preparing knowledge base…")
records = self.ensure_and_load_kb_for_current()
notes = self.notes.get("1.0", tk.END).strip()
kb_str = build_kb_string(records)
assets = [p for p in self.files if self.include_map.get(str(p), True)]
searx_url = (self.searx_url.get() or "").strip() or None
def _status_cb(s: str):
self._set_status(s)
res = websearch.prior_art_search(
ollama_host=self.ollama_host.get(),
model=self.ollama_model.get(),
notes=notes,
kb=kb_str,
assets=[str(p) for p in assets],
searx_url=searx_url,
status_cb=_status_cb,
)
self._set_status("Prior-art search complete")
self._ui(lambda r=res: self._show_prior_art_results(r))
except Exception as e:
self._set_status("Prior-art search failed")
msg = f"Prior-art search failed:\n{e}"
self._ui(lambda m=msg: messagebox.showerror("Error", m))
def _show_prior_art_results(self, result: Dict[str, Any]):
try:
win = tk.Toplevel(self)
win.title("Prior Art Results")
try:
win.geometry("980x640")
except Exception:
pass
top = ttk.Frame(win)
top.pack(side=tk.TOP, fill=tk.X, padx=8, pady=8)
ttk.Label(top, text="Queries:").pack(side=tk.LEFT)
q_str = "; ".join(result.get("queries", [])[:3])
q_entry = ttk.Entry(top)
q_entry.insert(0, q_str)
q_entry.configure(state="readonly")
q_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(6,0))
body = ttk.Panedwindow(win, orient=tk.VERTICAL)
body.pack(fill=tk.BOTH, expand=True, padx=8, pady=(0,8))
upper = ttk.Frame(body)
lower = ttk.Frame(body)
body.add(upper, weight=3)
body.add(lower, weight=2)
cols = ("score", "url")
tv = ttk.Treeview(upper, columns=cols, show="headings")
tv.heading("score", text="Score")
tv.heading("url", text="URL")
tv.column("score", width=60, anchor=tk.W)
tv.column("url", width=860, anchor=tk.W)
vs = ttk.Scrollbar(upper, orient=tk.VERTICAL, command=tv.yview)
tv.configure(yscrollcommand=vs.set)
tv.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
vs.pack(side=tk.RIGHT, fill=tk.Y)
# Snippet area
snip = tk.Text(lower, wrap=tk.WORD)
snip.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
snip_sb = ttk.Scrollbar(lower, orient=tk.VERTICAL, command=snip.yview)
snip.configure(yscrollcommand=snip_sb.set)
snip_sb.pack(side=tk.RIGHT, fill=tk.Y)
rows = result.get("results", []) or []
for i, item in enumerate(rows):
score = item.get("score", 0)
url = item.get("url", "")
tv.insert('', tk.END, iid=str(i), values=(f"{score:.1f}", url))
def _on_sel(_e=None):
sel = tv.selection()
if not sel:
return
try:
i = int(sel[0])
except Exception:
return
try:
data = rows[i]
except Exception:
data = {}
snippet = (data.get("snippet", "") or "").strip()
snip.configure(state="normal")
snip.delete("1.0", tk.END)
if snippet:
snip.insert("1.0", snippet)
snip.configure(state="disabled")
tv.bind('<<TreeviewSelect>>', _on_sel)
def _open_selected(_e=None):
sel = tv.selection()
if not sel:
return
try:
i = int(sel[0])
url = rows[i].get("url")
if url:
webbrowser.open(url)
except Exception:
pass
tv.bind('<Double-1>', _open_selected)
# Initial selection
try:
first = tv.get_children('')[:1]
if first:
tv.selection_set(first)
_on_sel()
except Exception:
pass
except Exception:
# fallback: messagebox
try:
urls = [x.get("url", "") for x in (result.get("results", []) or [])][:10]
messagebox.showinfo("Results", "\n".join(urls) or "No results")
except Exception:
pass
def _maybe_save_if_dirty(self) -> bool:
try:
# Only prompt if content truly differs from last saved snapshot
if not self._is_effectively_dirty():
return True
ans = messagebox.askyesnocancel("Unsaved changes", "Save changes to the current session?")
if ans is None:
return False
if ans is False:
return True
# ans is True -> attempt to save; confirm overwrite if needed
saved = self._save_session(confirm_overwrite=True, autosave=False)
if saved:
self._set_dirty(False)
return True
# Save not performed (likely declined overwrite) -> block
return False
except Exception:
return True
def _autosave_after_generation(self):
try:
if not (self.title_var.get() or "").strip():
return
saved = self._save_session(confirm_overwrite=False, autosave=True)
if saved:
self._set_dirty(False)
except Exception:
pass
def on_close(self):
if self._maybe_save_if_dirty():
try:
self.destroy()
except Exception:
os._exit(0)
def _save_settings(self):
try:
obj = {
'ollama_host': self.ollama_host.get(),
'ollama_model': self.ollama_model.get(),
'git_remote_url': self.git_remote_url.get(),
'searx_url': self.searx_url.get(),
'pandoc_path': getattr(self, 'pandoc_path', tk.StringVar(value='')).get() if hasattr(self, 'pandoc_path') else '',
'wkhtmltopdf_path': getattr(self, 'wkhtmltopdf_path', tk.StringVar(value='')).get() if hasattr(self, 'wkhtmltopdf_path') else '',
}
self._config_path().write_text(json.dumps(obj, ensure_ascii=False, indent=2), encoding='utf-8')
except Exception:
pass
def main():
try:
app = App()
app.mainloop()
except Exception:
traceback.print_exc()
if __name__ == "__main__":
main()