2025-09-12 21:45:11 +02:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Idea → Concept GUI
What this provides
- Drag / drop or add files and folders into a table
- Freeform notes box for thoughts and fragments
- Builds a JSONL knowledge base using corpus_builder . py if present ( fallback included )
- Generates a polished " Concept " using a local Ollama model
- Lets you edit , then save into a local Git repo and push to a remote
Notes
- Requires a local Ollama daemon ( default http : / / localhost : 11434 )
- Default model is the one you requested : " mistral3.2-small:24b " ( editable in UI )
Dependencies
- Standard library only for HTTP ( urllib ) ; requests is NOT required .
- Optional : tkinterdnd2 for native drag + drop ( falls back to buttons if missing )
Run
python3 concept_gui . py
"""
from __future__ import annotations
import os
import sys
import json
import time
import math
import shutil
import threading
import traceback
import hashlib
from dataclasses import dataclass
from pathlib import Path
2025-11-30 07:30:09 +01:00
from typing import List , Dict , Optional , Set , Any
2025-09-12 21:45:11 +02:00
# --- GUI imports
import tkinter as tk
from tkinter import ttk , filedialog , messagebox
from tkinter . scrolledtext import ScrolledText
# Optional native drag/drop support
try :
from tkinterdnd2 import DND_FILES , TkinterDnD # type: ignore
_TKDND_AVAILABLE = True
except Exception :
TkinterDnD = tk # type: ignore
DND_FILES = None # type: ignore
_TKDND_AVAILABLE = False
import subprocess
import tempfile
import re
import html
# --- HTTP (stdlib)
import urllib . request
import urllib . error
2025-09-13 03:50:53 +02:00
import webbrowser
import websearch
2025-09-12 21:45:11 +02:00
# -----------------------------
# Simple utilities
# -----------------------------
def human_size ( n : int ) - > str :
if n < = 0 :
return " 0 B "
units = [ " B " , " KB " , " MB " , " GB " , " TB " ]
k = 1024.0
i = int ( math . floor ( math . log ( n , k ) ) )
i = max ( 0 , min ( i , len ( units ) - 1 ) )
return f " { n / ( k * * i ) : .1f } { units [ i ] } "
def safe_symlink ( src : Path , dst : Path ) - > bool :
try :
if dst . exists ( ) or dst . is_symlink ( ) :
dst . unlink ( )
os . symlink ( src , dst )
return True
except Exception :
return False
def copy_or_link ( src : Path , dst_dir : Path ) - > Path :
dst_dir . mkdir ( parents = True , exist_ok = True )
dst = dst_dir / src . name
if safe_symlink ( src , dst ) :
return dst
# fallback to copy
shutil . copy2 ( src , dst )
return dst
def read_text_guess ( path : Path ) - > str :
try :
b = path . read_bytes ( )
for enc in ( " utf-8 " , " utf-16 " , " latin-1 " ) :
try :
return b . decode ( enc )
except Exception :
pass
return b . decode ( " utf-8 " , errors = " ignore " )
except Exception :
return " "
# -----------------------------
# Corpus building
# -----------------------------
@dataclass
class Record :
id : str
title : str
text : str
source_path : Optional [ str ] = None
mime : Optional [ str ] = None
class SimpleCorpusBuilder :
""" Very lightweight fallback if corpus_builder.py or deps are unavailable.
Supports : txt , md , rst , html ( strip tags naively ) , pdf ( if PyMuPDF installed ) .
"""
def __init__ ( self ) - > None :
self . _fitz = None
try :
import fitz # PyMuPDF
self . _fitz = fitz
except Exception :
self . _fitz = None
def build ( self , root : Path , out_jsonl : Path ) - > List [ Record ] :
out_jsonl . parent . mkdir ( parents = True , exist_ok = True )
records : List [ Record ] = [ ]
for p in root . rglob ( " * " ) :
if not p . is_file ( ) :
continue
suf = p . suffix . lower ( )
try :
if suf in { " .txt " , " .md " , " .rst " } :
text = read_text_guess ( p )
if text . strip ( ) :
records . append ( Record ( id = str ( p ) , title = p . stem , text = text , source_path = str ( p ) ) )
elif suf in { " .html " , " .htm " } :
raw = read_text_guess ( p )
text = self . _strip_html ( raw )
if text . strip ( ) :
records . append ( Record ( id = str ( p ) , title = p . stem , text = text , source_path = str ( p ) ) )
elif suf == " .pdf " and self . _fitz is not None :
text = self . _pdf_text ( p )
if text . strip ( ) :
records . append ( Record ( id = str ( p ) , title = p . stem , text = text , source_path = str ( p ) ) )
else :
# just record stub entry for unsupported types
records . append ( Record ( id = str ( p ) , title = p . stem , text = f " [Unsupported file type: { suf } ] " , source_path = str ( p ) ) )
except Exception :
records . append ( Record ( id = str ( p ) , title = p . stem , text = f " [Error reading file: { p . name } ] " , source_path = str ( p ) ) )
# write JSONL
with out_jsonl . open ( " w " , encoding = " utf-8 " ) as fh :
for r in records :
fh . write ( json . dumps ( r . __dict__ , ensure_ascii = False ) + " \n " )
return records
def _strip_html ( self , html : str ) - > str :
# very naive fallback without bs4
try :
from bs4 import BeautifulSoup # type: ignore
soup = BeautifulSoup ( html , " html.parser " )
for tag in soup ( [ " script " , " style " ] ) :
tag . decompose ( )
text = soup . get_text ( " \n " , strip = True )
return text
except Exception :
import re
txt = re . sub ( r " < \ s*(script|style)[^>]*>.*?< \ s*/ \ s* \ 1 \ s*> " , " " , html , flags = re . S | re . I )
txt = re . sub ( r " <[^>]+> " , " " , txt )
txt = re . sub ( r " \ s+ " , " " , txt )
return txt . strip ( )
def _pdf_text ( self , path : Path ) - > str :
try :
doc = self . _fitz . open ( str ( path ) )
out = [ ]
for i in range ( len ( doc ) ) :
page = doc . load_page ( i )
out . append ( page . get_text ( " text " ) )
return " \n \n " . join ( out )
except Exception :
return " "
class ExternalCorpusBuilder :
""" Invokes corpus_builder.py as a subprocess to build a JSONL corpus. """
def __init__ ( self , script_path : Path ) - > None :
self . script = script_path
def build ( self , root : Path , out_jsonl : Path , * , workers : int = 4 , verbose : bool = False ) - > bool :
cmd = [
sys . executable ,
str ( self . script ) ,
" --root " , str ( root ) ,
" --out " , str ( out_jsonl ) ,
" --emit " , " auto " ,
" --workers " , str ( max ( 1 , workers ) ) ,
" --llm-parallel " , " 1 " ,
]
if verbose :
cmd . append ( " --verbose " )
try :
import subprocess
proc = subprocess . run ( cmd , stdout = subprocess . PIPE , stderr = subprocess . STDOUT , text = True )
ok = proc . returncode == 0 and out_jsonl . exists ( ) and out_jsonl . stat ( ) . st_size > 0
return ok
except Exception :
return False
# -----------------------------
# Ollama client (stdlib HTTP)
# -----------------------------
class OllamaClient :
def __init__ ( self , host : str = " http://localhost:11434 " , timeout : int = 600 ) :
self . host = host . rstrip ( " / " )
self . timeout = timeout
def generate ( self , model : str , prompt : str ) - > str :
url = f " { self . host } /api/generate "
payload = {
" model " : model ,
" prompt " : prompt ,
" stream " : False ,
}
data = json . dumps ( payload ) . encode ( " utf-8 " )
req = urllib . request . Request ( url , data = data , headers = { " Content-Type " : " application/json " } , method = " POST " )
try :
with urllib . request . urlopen ( req , timeout = self . timeout ) as resp :
body = resp . read ( )
except urllib . error . HTTPError as e :
raise RuntimeError ( f " Ollama HTTP error { e . code } : { e . read ( ) . decode ( ' utf-8 ' , ' ignore ' ) } " )
except Exception as e :
raise RuntimeError ( f " Ollama request failed: { e } " )
try :
obj = json . loads ( body . decode ( " utf-8 " , " ignore " ) )
except Exception :
raise RuntimeError ( " Invalid JSON from Ollama " )
return ( obj . get ( " response " ) or " " ) . strip ( )
def _parse_json_strict ( s : str ) - > Optional [ Dict [ str , str ] ] :
try :
s = sanitize_llm_text_simple ( s )
return json . loads ( s )
except Exception :
m = re . search ( r " \ { [ \ s \ S]* \ } " , s )
if m :
try :
return json . loads ( m . group ( 0 ) )
except Exception :
return None
return None
# (Discord webhook removed; now using Git push flow)
# -----------------------------
# Prompting
# -----------------------------
2025-09-13 02:54:57 +02:00
'''
2025-09-12 21:45:11 +02:00
PROMPT_TEMPLATE = """
You are an experienced product strategist and technical writer .
Write a clear , compelling PROJECT CONCEPT based only on the information in Notes and Knowledge Base below .
Output in Markdown with these sections :
- Overview & Problem
- Vision & Goals
- Target Users & Use Cases
- Key Features
- Non - Goals / Out of Scope
- Constraints & Risks
- Rough Architecture / Approach
- Milestones & Next Steps
- Success Criteria & Metrics
Guidelines :
- Be pragmatic , specific , and actionable . Do not fabricate details not present in the sources .
- Pull concrete facts , quotes , and constraints from the knowledge base .
- If something is unknown or ambiguous , add a TODO with a short question .
- Use short paragraphs and bullets ; avoid marketing fluff .
Assets to include :
- The following files will be committed alongside README . md in the same folder .
- When relevant , embed images using Markdown ( e . g . , ` ! [ Label ] ( . / FILENAME ) ` ) , and link documents / other files using ` [ Label ] ( . / FILENAME ) ` .
- Place links or images in the appropriate sections where they add clarity .
Assets Provided :
{ ASSETS }
Notes ( from user ) :
{ NOTES }
2025-09-13 02:54:57 +02:00
Knowledge Base ( source excerpts ) :
{ KB }
""" .strip()
'''
PROMPT_TEMPLATE = """
You are a cross - domain concept developer ( product strategist , creative producer , research lead , grant writer ) .
Turn the sources into a concise , presentable CONCEPT document . Adapt to the domain .
INSTRUCTIONS
1 ) Detect IDEA TYPE ( pick one primary ; if unclear , choose closest and add a TODO ) :
{ Product / Software , Service , Research / Study , Policy / Proposal , Art / Exhibition / Performance , Event / Program ,
Education / Curriculum , Media / Film / Publication , Campaign / Nonprofit , Data / ML / Infrastructure , Game / Interactive ,
Writing / Book / Article , Other }
2 ) Tone & register :
- Product / Software → pragmatic PM / tech brief
- Research → neutral academic project brief
- Policy → policy memo
- Art / Exhibition / Performance → curator / producer note ( clear , not flowery )
- Event → producer ’ s run - of - show style
- Education → syllabus brief
- Media / Publication → one - sheet
- Campaign / Nonprofit → strategy brief
- Data / ML / Infrastructure → engineering design note
- Game / Interactive → design doc overview
- Writing / Book / Article → proposal overview
3 ) Output Markdown using these core sections ( use these exact headings ; include only relevant ones ) :
- Overview & Intent
- Context / Problem ( or Opportunity )
- Audience / Stakeholders
- Deliverables / Outputs & Scope
- Approach / Method ( rename to “ Methodology ” , “ Implementation Plan ” , “ Format & Installation Plan ” , etc . , to fit the idea type )
- Resources / Budget / Tools ( only if present ; else add a short TODO )
- Timeline & Milestones
- Risks , Ethics & Constraints
- Success Criteria / Evaluation
- Open Questions ( TODOs )
Add one domain - specific block ( only if relevant and supported by sources ) :
- Product / Software : Key Features ; Non - Goals ; Rough Architecture ; Dependencies & Integration ; License .
- Research / Study : Research Questions ; Methodology & Data ; Expected Contributions ; References / Citations .
- Policy / Proposal : Policy Mechanism ; Legal / Standards ; Impact Assessment ; Implementation Steps .
- Art / Exhibition / Performance : Conceptual Frame & References ; Medium / Materials ; Venue / Spatial Requirements ; Tech / AV ; Rights / Permissions .
- Event / Program : Programme Outline / Run - of - Show ; Roles & Staffing ; Logistics & Venue .
- Education / Curriculum : Learning Objectives ; Syllabus Outline ; Assessment & Materials .
- Media / Film / Publication : Logline & Synopsis ; Format ; Production Plan ; Distribution .
- Campaign / Nonprofit : Theory of Change ; Channels & Tactics ; KPIs ; Partnerships .
- Data / ML / Infrastructure : Data Sources ; Models ; Architecture Diagram ( describe ) ; Privacy & Compliance ; Ops / Monitoring .
- Game / Interactive : Core Loop ; Mechanics ; Narrative ; Tech ; Monetization ( if relevant ) .
- Writing / Book / Article : Thesis ; Outline / Chapters ; Sources ; Target Readers .
4 ) Evidence use :
- Use only facts in Notes / KB . If missing , add short TODOs instead of inventing .
- Where a claim relies on a specific source , include a short inline blockquote with “ Source : < Path or Title > ” .
5 ) Assets :
- These files are committed alongside README . md . Embed images with Markdown and link documents where they help clarity .
STYLE
- Short paragraphs and bullets ; concrete , specific , and actionable . Avoid marketing fluff .
- If dates / budget / ownership are uncertain , show ranges or TODOs .
- Keep a neutral , professional tone adapted to the idea type .
TITLE
2025-09-13 03:12:40 +02:00
- Generate a neutral 2 - 4 words working title .
2025-09-13 02:54:57 +02:00
- Begin the document with “ # {Title}”.
Assets Provided :
{ ASSETS }
Notes ( from user ) :
{ NOTES }
2025-09-12 21:45:11 +02:00
Knowledge Base ( source excerpts ) :
{ KB }
""" .strip()
2025-11-30 07:30:09 +01:00
REPHRASE_LENSES = [
{
" key " : " neutral " ,
" label " : " Neutral Clarification / Expansion " ,
" prompt " : """ Take the following rough note and turn it into a single clear, concise paragraph that captures the main idea.
- Keep a neutral , explanatory tone .
- Don ' t add new features or speculation, only clarify and connect what is already there.
- Output exactly one paragraph .
Note :
{ USER_NOTE }
""" ,
} ,
{
" key " : " problem_solution " ,
" label " : " Problem-Solution Framing " ,
" prompt " : """ Rewrite the following note as a single paragraph that clearly describes:
1. What problem or frustration exists ,
2. For whom ,
3. How the idea could solve it in principle .
Keep it concrete but high - level , no implementation details .
Output exactly one paragraph .
Note :
{ USER_NOTE }
""" ,
} ,
{
" key " : " user_story " ,
" label " : " User Story / Scenario " ,
" prompt " : """ Rewrite the following note as a single paragraph that describes a short scenario from a user ' s point of view.
Show how a specific person encounters the situation and how this idea helps them .
Keep it realistic and simple , not hype - y .
Output exactly one paragraph .
Note :
{ USER_NOTE }
""" ,
} ,
{
" key " : " value_prop " ,
" label " : " Value Proposition / Pitch " ,
" prompt " : """ Rewrite the following note as a single paragraph that sounds like a clear, simple pitch of the idea.
Explain what it is , who it ' s for, and why it ' s valuable or interesting .
Avoid buzzwords ; keep it grounded and concrete .
Output exactly one paragraph .
Note :
{ USER_NOTE }
""" ,
} ,
{
" key " : " implementation " ,
" label " : " Implementation / Next Steps " ,
" prompt " : """ Rewrite the following note as a single paragraph that keeps the original idea but focuses on how one might start implementing or exploring it.
Mention 2 - 3 plausible first steps or components without going into deep technical detail .
Output exactly one paragraph .
Note :
{ USER_NOTE }
""" ,
} ,
]
EXTEND_PROMPT = """
You are continuing the user ' s own note. Keep writing in the same language, tone, and formatting style they used.
Instructions :
- Extend the idea with additional possibilities , use cases , angles , or problems to consider .
- Preserve the author ' s voice: match their formality, punctuation habits, and quirks (e.g., all lowercase, terse bullets, or formal sentences).
- Do not summarize or rewrite the original ; add new material that flows naturally after it .
- Keep it concise ( 2 - 5 sentences or a few short bullet points ) .
- If the input is in bullet form , continue the bullets ; otherwise , continue the paragraph .
Original note :
{ USER_NOTE }
""" .strip()
2025-09-12 21:45:11 +02:00
def build_kb_string ( records : List [ Record ] , * , max_chars : int = 80000 , per_record_cap : int = 4000 ) - > str :
""" Format records into a compact KB string with caps to avoid overlong prompts. """
parts : List [ str ] = [ ]
budget = max_chars
for r in records :
if budget < = 0 :
break
text = ( r . text or " " ) . strip ( )
if not text :
continue
if len ( text ) > per_record_cap :
text = text [ : per_record_cap ] + " \n ...[truncated] "
title = r . title or ( Path ( r . source_path ) . name if r . source_path else r . id )
header = f " \n --- \n Source: { title } \n Path: { r . source_path or ' ' } \n \n "
chunk = header + text . strip ( ) + " \n "
if len ( chunk ) > budget :
chunk = chunk [ : budget ]
parts . append ( chunk )
budget - = len ( chunk )
return ( " \n " . join ( parts ) ) . strip ( )
def sanitize_llm_text_simple ( s : str ) - > str :
""" Remove <think> blocks and surrounding code fences from LLM responses. """
try :
s = re . sub ( r " <think>.*?</think> " , " " , s , flags = re . S | re . I )
s = re . sub ( r " ^ \ s*```(?: \ w+)? \ s* " , " " , s )
s = re . sub ( r " \ s*``` \ s*$ " , " " , s )
return s . strip ( )
except Exception :
return ( s or " " ) . strip ( )
def md_heading_replace_or_insert ( md : str , title : str ) - > str :
""" Ensure the first-level heading is the provided title, replacing ' # PROJECT CONCEPT ' if present. """
if not md :
return f " # { title } \n \n "
lines = md . splitlines ( )
# Replace '# PROJECT CONCEPT' (case-insensitive)
if lines and re . match ( r " ^ \ s*# \ s+project \ s+concept \ s*$ " , lines [ 0 ] , flags = re . I ) :
lines [ 0 ] = f " # { title } "
return " \n " . join ( lines )
# If first line is already a H1, leave it as-is; else insert
if lines and re . match ( r " ^ \ s*# \ s+ " , lines [ 0 ] ) :
return md
return f " # { title } \n \n " + md
def strip_wrapping_quotes ( s : str ) - > str :
s = s . strip ( )
s = re . sub ( r " ^[ \" ' “”‘’]+ " , " " , s )
s = re . sub ( r " [ \" ' “”‘’]+$ " , " " , s )
return s
# -----------------------------
# GUI Application
# -----------------------------
class App ( TkinterDnD . Tk ) : # type: ignore
def __init__ ( self ) :
super ( ) . __init__ ( )
self . title ( " Idea → Concept " )
self . geometry ( " 1080x720 " )
# Enforce a sensible minimum window size (as before)
try :
self . minsize ( 900 , 600 )
except Exception :
pass
# Try to set application/window icon from icon.png
try :
self . _set_app_icon ( )
except Exception :
# Non-fatal: keep running with default icon
pass
# State
self . files : List [ Path ] = [ ]
self . records : List [ Record ] = [ ]
# Legacy fields kept but no longer used for per-session staging
self . staging_dir : Optional [ Path ] = None
self . corpus_path : Optional [ Path ] = None
self . include_map : Dict [ str , bool ] = { }
self . file_hashes : Dict [ str , str ] = { } # path -> sha256
self . _seen_hashes : Set [ str ] = set ( ) # hashes present in corpus.jsonl
self . _ingesting : Set [ str ] = set ( ) # hashes currently being ingested
self . _base_dir : Path = Path . cwd ( ) / " .idea-hole "
self . _files_dir : Path = self . _base_dir / " files "
self . _corpus_file : Path = self . _base_dir / " corpus.jsonl "
self . _sessions_file : Path = self . _base_dir / " sessions.jsonl "
2025-11-30 07:30:09 +01:00
self . rephrase_variants : List [ Dict [ str , str ] ] = [ ]
self . rephrase_selected_key : Optional [ str ] = None
self . _suppress_rephrase_select : bool = False
2025-09-12 21:45:11 +02:00
# Defaults
self . ollama_host = tk . StringVar ( value = os . environ . get ( " OLLAMA_HOST " , " http://localhost:11434 " ) )
# Default model prompt suggests explicit selection
self . ollama_model = tk . StringVar ( value = os . environ . get ( " IDEA_HOLE_MODEL " , " Select model... " ) )
self . git_remote_url = tk . StringVar ( value = os . environ . get ( " IDEA_HOLE_REMOTE " , " " ) )
2025-09-13 03:50:53 +02:00
# SearXNG URL (for prior-art search)
try :
_searx_default = websearch . SEARX_DEFAULT_URL
except Exception :
_searx_default = " http://localhost:8888 "
self . searx_url = tk . StringVar ( value = os . environ . get ( " SEARX_URL " , _searx_default ) )
2025-09-12 21:45:11 +02:00
# Concept metadata
self . title_var = tk . StringVar ( value = " " )
self . desc_var = tk . StringVar ( value = " " )
# Load persisted settings (if any)
self . _load_settings ( )
# Prepare unified storage and indexes
self . _init_storage ( )
self . _build_ui ( )
2025-11-30 07:30:09 +01:00
self . _refresh_rephrase_tree ( )
2025-09-12 21:45:11 +02:00
self . _maybe_enable_dnd ( )
# Dirty tracking and close handler
self . _dirty = False
try :
self . protocol ( ' WM_DELETE_WINDOW ' , self . on_close )
except Exception :
pass
# Initialize last-saved snapshot to current clean state
try :
self . _last_saved = self . _snapshot_session_state ( )
except Exception :
self . _last_saved = {
" title " : " " ,
" description " : " " ,
" notes " : " " ,
" concept " : " " ,
" files " : [ ] ,
2025-11-30 07:30:09 +01:00
" rephrase_variants " : [ ] ,
" rephrase_selected_key " : None ,
2025-09-12 21:45:11 +02:00
}
def _set_app_icon ( self ) - > None :
""" Set the window/taskbar/dock icon from local icon.png when possible.
- Uses Tk ' s iconphoto on all platforms (PNG supported on Tk 8.6+).
- On macOS , also tries to set the dock icon via AppKit if available .
"""
icon_path = Path ( __file__ ) . parent / " icon.png "
if not icon_path . exists ( ) :
return
# Keep a reference to avoid GC
try :
self . _icon_photoimage = tk . PhotoImage ( file = str ( icon_path ) )
# Set for this window and as the default for future toplevels
try :
self . iconphoto ( True , self . _icon_photoimage )
except Exception :
# Older Tk variants
self . tk . call ( ' wm ' , ' iconphoto ' , self . _w , self . _icon_photoimage )
except Exception :
# PhotoImage may fail if Tk lacks PNG support
self . _icon_photoimage = None
# On macOS, set the dock icon using AppKit if available (optional)
if sys . platform == ' darwin ' :
try :
from AppKit import NSImage , NSApplication
app = NSApplication . sharedApplication ( )
nsimg = NSImage . alloc ( ) . initWithContentsOfFile_ ( str ( icon_path ) )
if nsimg is not None :
app . setApplicationIconImage_ ( nsimg )
except Exception :
# AppKit (pyobjc) not available; ignore
pass
# --- UI construction
def _build_ui ( self ) :
root = self
# Top controls
top = ttk . Frame ( root )
top . pack ( side = tk . TOP , fill = tk . X , padx = 8 , pady = 6 )
# Session file actions at top-left
ttk . Button ( top , text = " New " , command = self . on_new_session ) . pack ( side = tk . LEFT )
ttk . Button ( top , text = " Open " , command = self . on_open_session ) . pack ( side = tk . LEFT , padx = ( 6 , 0 ) )
ttk . Button ( top , text = " Save " , command = self . on_save_session ) . pack ( side = tk . LEFT , padx = ( 6 , 0 ) )
# Status moved to top-right
self . status = ttk . Label ( top , text = " Ready " , anchor = tk . E )
self . status . pack ( side = tk . RIGHT )
# Paned layout: left (files + notes), right (concept)
paned = ttk . Panedwindow ( root , orient = tk . HORIZONTAL )
paned . pack ( fill = tk . BOTH , expand = True , padx = 8 , pady = ( 0 , 8 ) )
left = ttk . Frame ( paned )
right = ttk . Frame ( paned )
paned . add ( left , weight = 1 )
paned . add ( right , weight = 1 )
# Files table (with controls inside the same frame)
files_frame = ttk . LabelFrame ( left , text = " Files (drag & drop or use buttons) " )
files_frame . pack ( side = tk . TOP , fill = tk . BOTH , expand = False , pady = ( 0 , 8 ) )
# Inner container for tree + scrollbar to allow controls below
files_inner = ttk . Frame ( files_frame )
files_inner . pack ( side = tk . TOP , fill = tk . BOTH , expand = True )
# Prevent scrollbar squashing by using grid with a fixed minsize column
try :
files_inner . rowconfigure ( 0 , weight = 1 )
files_inner . columnconfigure ( 0 , weight = 1 )
files_inner . columnconfigure ( 1 , minsize = 14 )
except Exception :
pass
cols = ( " name " , " path " , " type " , " size " , " include " )
self . tree = ttk . Treeview ( files_inner , columns = cols , show = " headings " , height = 8 )
for c , w in ( " name " , 80 ) , ( " path " , 100 ) , ( " type " , 20 ) , ( " size " , 30 ) , ( " include " , 30 ) :
heading = " Add to Repo " if c == " include " else c . capitalize ( )
self . tree . heading ( c , text = heading )
self . tree . column ( c , width = w , anchor = tk . W )
try :
self . tree . grid ( row = 0 , column = 0 , sticky = ' nsew ' )
except Exception :
self . tree . pack ( side = tk . LEFT , fill = tk . BOTH , expand = True )
self . tree_vsb = ttk . Scrollbar ( files_inner , orient = tk . VERTICAL , command = self . tree . yview )
try :
self . tree_vsb . grid ( row = 0 , column = 1 , sticky = ' ns ' )
except Exception :
# Fallback if grid not available
self . tree_vsb . pack ( side = tk . RIGHT , fill = tk . Y )
# Connect scrollbar normally (always visible; resilient to squashing)
self . tree . configure ( yscrollcommand = self . tree_vsb . set )
# Click to toggle include checkbox-like column
self . tree . bind ( ' <Button-1> ' , self . on_tree_click )
# No auto-hide for reliability
# File table controls inside the files frame border
file_controls = ttk . Frame ( files_frame )
# Slightly more padding at bottom than top
file_controls . pack ( side = tk . TOP , fill = tk . X , padx = ( 8 , 0 ) , pady = ( 4 , 8 ) )
ttk . Button ( file_controls , text = " Add Files " , command = self . on_add_files ) . pack ( side = tk . LEFT )
ttk . Button ( file_controls , text = " Add Folder " , command = self . on_add_folder ) . pack ( side = tk . LEFT , padx = ( 6 , 0 ) )
ttk . Button ( file_controls , text = " Remove Selected " , command = self . on_remove_selected ) . pack ( side = tk . LEFT , padx = ( 6 , 0 ) )
ttk . Button ( file_controls , text = " Clear All " , command = self . on_clear_all ) . pack ( side = tk . LEFT , padx = ( 6 , 0 ) )
# Notes (with Generate Concept button inside the same frame)
notes_frame = ttk . LabelFrame ( left , text = " Notes / Thoughts " )
notes_frame . pack ( side = tk . TOP , fill = tk . BOTH , expand = True , pady = ( 8 , 0 ) )
# Notes text area with ttk scrollbar that auto-hides
notes_container = ttk . Frame ( notes_frame )
notes_container . pack ( fill = tk . BOTH , expand = True )
try :
notes_container . rowconfigure ( 0 , weight = 1 )
notes_container . columnconfigure ( 0 , weight = 1 )
notes_container . columnconfigure ( 1 , minsize = 14 )
except Exception :
pass
self . notes = tk . Text ( notes_container , height = 12 , wrap = tk . WORD , undo = True )
try :
self . notes . grid ( row = 0 , column = 0 , sticky = ' nsew ' )
except Exception :
self . notes . pack ( side = tk . LEFT , fill = tk . BOTH , expand = True )
self . notes_vsb = ttk . Scrollbar ( notes_container , orient = tk . VERTICAL , command = self . notes . yview )
try :
self . notes_vsb . grid ( row = 0 , column = 1 , sticky = ' ns ' )
except Exception :
# Fallback
self . notes_vsb . pack ( side = tk . RIGHT , fill = tk . Y )
# Connect scrollbar normally (always visible; resilient to squashing)
self . notes . configure ( yscrollcommand = self . notes_vsb . set )
# No auto-hide for reliability
2025-11-30 07:30:09 +01:00
# Rephrase/variant list sits between the notes box and action buttons
self . rephrase_frame = ttk . Frame ( notes_frame )
2025-11-30 07:40:31 +01:00
self . _rephrase_pack_kwargs = { " side " : tk . TOP , " fill " : tk . BOTH , " expand " : False , " padx " : ( 8 , 0 ) , " pady " : ( 6 , 0 ) }
2025-11-30 07:30:09 +01:00
ttk . Label ( self . rephrase_frame , text = " Rephrase variants: " ) . pack ( anchor = tk . W )
rephrase_list_container = ttk . Frame ( self . rephrase_frame )
rephrase_list_container . pack ( fill = tk . BOTH , expand = True )
try :
rephrase_list_container . rowconfigure ( 0 , weight = 1 )
rephrase_list_container . columnconfigure ( 0 , weight = 1 )
rephrase_list_container . columnconfigure ( 1 , minsize = 14 )
except Exception :
pass
self . rephrase_tree = ttk . Treeview ( rephrase_list_container , columns = ( " variant " , " preview " ) , show = " headings " , height = 6 , selectmode = " browse " )
self . rephrase_tree . heading ( " variant " , text = " Variant " )
self . rephrase_tree . heading ( " preview " , text = " Preview " )
self . rephrase_tree . column ( " variant " , width = 140 , anchor = tk . W )
self . rephrase_tree . column ( " preview " , width = 360 , anchor = tk . W )
try :
self . rephrase_tree . grid ( row = 0 , column = 0 , sticky = " nsew " )
except Exception :
self . rephrase_tree . pack ( side = tk . LEFT , fill = tk . BOTH , expand = True )
self . rephrase_vsb = ttk . Scrollbar ( rephrase_list_container , orient = tk . VERTICAL , command = self . rephrase_tree . yview )
try :
self . rephrase_vsb . grid ( row = 0 , column = 1 , sticky = " ns " )
except Exception :
self . rephrase_vsb . pack ( side = tk . RIGHT , fill = tk . Y )
self . rephrase_tree . configure ( yscrollcommand = self . rephrase_vsb . set )
self . rephrase_tree . bind ( " <<TreeviewSelect>> " , self . _on_rephrase_select )
2025-09-12 21:45:11 +02:00
notes_actions = ttk . Frame ( notes_frame )
notes_actions . pack ( side = tk . TOP , fill = tk . X , padx = ( 8 , 0 ) , pady = ( 6 , 6 ) )
2025-11-30 07:30:09 +01:00
ttk . Button ( notes_actions , text = " Rephrase " , command = self . on_rephrase ) . pack ( side = tk . LEFT )
ttk . Button ( notes_actions , text = " Extend " , command = self . on_extend ) . pack ( side = tk . LEFT , padx = ( 6 , 0 ) )
ttk . Button ( notes_actions , text = " Generate Concept " , command = self . on_generate ) . pack ( side = tk . LEFT , padx = ( 6 , 0 ) )
2025-09-13 03:50:53 +02:00
ttk . Button ( notes_actions , text = " Find Prior Art " , command = self . on_prior_art ) . pack ( side = tk . LEFT , padx = ( 6 , 0 ) )
2025-09-12 21:45:11 +02:00
# Concept editor + metadata
concept_frame = ttk . LabelFrame ( right , text = " Concept (editable) " )
concept_frame . pack ( side = tk . TOP , fill = tk . BOTH , expand = True )
meta = ttk . Frame ( concept_frame )
# Stack Title and Description vertically; align with section inset
meta . pack ( side = tk . TOP , fill = tk . X , padx = ( 8 , 8 ) , pady = ( 6 , 6 ) )
ttk . Label ( meta , text = " Title: " ) . pack ( anchor = tk . W )
self . title_entry = tk . Entry ( meta , textvariable = self . title_var )
self . title_entry . pack ( fill = tk . X , expand = True )
ttk . Label ( meta , text = " Description: " ) . pack ( anchor = tk . W , pady = ( 6 , 0 ) )
self . desc_entry = tk . Entry ( meta , textvariable = self . desc_var )
self . desc_entry . pack ( fill = tk . X , expand = True )
# Concept text area with ttk scrollbar that auto-hides
concept_container = ttk . Frame ( concept_frame )
concept_container . pack ( fill = tk . BOTH , expand = True )
try :
concept_container . rowconfigure ( 0 , weight = 1 )
concept_container . columnconfigure ( 0 , weight = 1 )
concept_container . columnconfigure ( 1 , minsize = 14 )
except Exception :
pass
self . concept = tk . Text ( concept_container , wrap = tk . WORD , undo = True )
try :
self . concept . grid ( row = 0 , column = 0 , sticky = ' nsew ' )
except Exception :
self . concept . pack ( side = tk . LEFT , fill = tk . BOTH , expand = True )
self . concept_vsb = ttk . Scrollbar ( concept_container , orient = tk . VERTICAL , command = self . concept . yview )
try :
self . concept_vsb . grid ( row = 0 , column = 1 , sticky = ' ns ' )
except Exception :
# Fallback
self . concept_vsb . pack ( side = tk . RIGHT , fill = tk . Y )
# Connect scrollbar normally (always visible; resilient to squashing)
self . concept . configure ( yscrollcommand = self . concept_vsb . set )
# No auto-hide for reliability
#
# Match Title/Description entry backgrounds to textarea background
try :
_bg = self . concept . cget ( ' background ' )
self . title_entry . configure ( bg = _bg )
self . desc_entry . configure ( bg = _bg )
except Exception :
pass
# Push to Repo under the concept editor (aligned horizontally with Generate Concept)
concept_actions = ttk . Frame ( concept_frame )
concept_actions . pack ( side = tk . TOP , fill = tk . X , padx = ( 8 , 0 ) , pady = ( 6 , 6 ) )
# PDF preview button
self . preview_btn = ttk . Button ( concept_actions , text = " Preview PDF " , command = self . on_preview )
self . preview_btn . pack ( side = tk . LEFT )
self . push_btn = ttk . Button ( concept_actions , text = " Push to Repo " , command = self . on_push )
self . push_btn . pack ( side = tk . LEFT , padx = ( 6 , 0 ) )
# Bottom bar
bottom = ttk . Frame ( root )
bottom . pack ( side = tk . BOTTOM , fill = tk . X , padx = 8 , pady = ( 0 , 8 ) )
# Moved Ollama/Model/Remote inputs to bottom bar
ttk . Label ( bottom , text = " Ollama host: " ) . pack ( side = tk . LEFT )
ttk . Entry ( bottom , textvariable = self . ollama_host , width = 22 ) . pack ( side = tk . LEFT , padx = ( 4 , 10 ) )
ttk . Label ( bottom , text = " Model: " ) . pack ( side = tk . LEFT )
self . model_combo = ttk . Combobox ( bottom , textvariable = self . ollama_model , state = " readonly " , width = 20 , values = self . _get_model_values ( ) )
self . model_combo . pack ( side = tk . LEFT , padx = ( 4 , 10 ) )
ttk . Label ( bottom , text = " Remote git repository: " ) . pack ( side = tk . LEFT )
ttk . Entry ( bottom , textvariable = self . git_remote_url , width = 40 ) . pack ( side = tk . LEFT , padx = ( 4 , 10 ) )
2025-09-13 03:50:53 +02:00
ttk . Label ( bottom , text = " SearXNG URL: " ) . pack ( side = tk . LEFT )
ttk . Entry ( bottom , textvariable = self . searx_url , width = 26 ) . pack ( side = tk . LEFT , padx = ( 4 , 0 ) )
2025-09-12 21:45:11 +02:00
# Enable/disable push button based on fields
self . title_var . trace_add ( ' write ' , lambda * a : ( self . update_push_state ( ) , self . _set_dirty ( True ) ) )
self . desc_var . trace_add ( ' write ' , lambda * a : ( self . update_push_state ( ) , self . _set_dirty ( True ) ) )
self . concept . bind ( ' <<Modified>> ' , self . _on_concept_modified )
try :
self . notes . bind ( ' <<Modified>> ' , self . _on_notes_modified )
except Exception :
pass
# Persist settings
self . ollama_host . trace_add ( ' write ' , lambda * a : self . _save_settings ( ) )
self . ollama_model . trace_add ( ' write ' , lambda * a : self . _save_settings ( ) )
self . git_remote_url . trace_add ( ' write ' , lambda * a : self . _save_settings ( ) )
2025-09-13 03:50:53 +02:00
self . searx_url . trace_add ( ' write ' , lambda * a : self . _save_settings ( ) )
2025-09-12 21:45:11 +02:00
# Optional tool overrides would be persisted if we expose them later
self . update_push_state ( )
# Ensure placeholder if saved model not present
try :
values = list ( self . model_combo [ " values " ] ) if hasattr ( self , ' model_combo ' ) else [ ]
if self . ollama_model . get ( ) not in values :
self . ollama_model . set ( " Select model... " )
except Exception :
pass
def _extract_title_desc ( self , concept_md : str , client : Optional [ OllamaClient ] = None ) - > ( Optional [ str ] , Optional [ str ] ) :
""" Second-pass LLM call to extract title and description as JSON. """
try :
client = client or OllamaClient ( host = self . ollama_host . get ( ) )
prompt = (
" Extract a concise title and a one-sentence description from the following concept. \n "
" - Title: <= 50 chars (3-5 words). \n - Description: <= 120 chars, (one sentence) no trailing period. \n "
" Return ONLY strict JSON with keys ' title ' and ' description ' . \n \n CONCEPT: \n " + concept_md
)
raw = client . generate ( self . ollama_model . get ( ) , prompt )
obj = _parse_json_strict ( raw ) or { }
title = strip_wrapping_quotes ( str ( obj . get ( ' title ' ) or ' ' ) . strip ( ) ) or None
desc = strip_wrapping_quotes ( str ( obj . get ( ' description ' ) or ' ' ) . strip ( ) ) or None
return title , desc
except Exception :
return None , None
def _maybe_enable_dnd ( self ) :
if _TKDND_AVAILABLE and DND_FILES is not None :
try :
self . tree . drop_target_register ( DND_FILES )
self . tree . dnd_bind ( ' <<Drop>> ' , self . _on_drop )
self . _set_status ( " Drag & drop enabled " )
except Exception :
self . _set_status ( " Drag & drop not available (fallback to buttons) " )
else :
self . _set_status ( " Drag & drop not available (fallback to buttons) " )
# --- File ops
def _on_drop ( self , event ) : # type: ignore
raw = event . data
paths = self . _parse_dnd_paths ( raw )
self . _add_paths ( paths )
@staticmethod
def _parse_dnd_paths ( s : str ) - > List [ Path ] :
# Handles Tcl-style space-separated paths with braces
out : List [ Path ] = [ ]
buf = ' '
in_brace = False
for ch in s :
if ch == ' { ' :
in_brace = True
if buf :
buf + = ch
continue
if ch == ' } ' :
in_brace = False
if buf :
buf + = ch
continue
if ch == ' ' and not in_brace :
p = buf . strip ( ) . strip ( ' {} ' )
if p :
out . append ( Path ( p ) )
buf = ' '
else :
buf + = ch
if buf . strip ( ) :
out . append ( Path ( buf . strip ( ) . strip ( ' {} ' ) ) )
# Expand directories
final : List [ Path ] = [ ]
for p in out :
if p . is_dir ( ) :
for q in p . rglob ( ' * ' ) :
if q . is_file ( ) :
final . append ( q )
elif p . exists ( ) :
final . append ( p )
return final
def _add_paths ( self , paths : List [ Path ] ) :
# Expand directories into files
expanded : List [ Path ] = [ ]
for p in paths :
if p . is_dir ( ) :
for q in p . rglob ( ' * ' ) :
if q . is_file ( ) :
expanded . append ( q )
elif p . is_file ( ) :
expanded . append ( p )
added = 0
new_files : List [ Path ] = [ ]
for p in expanded :
if p . is_file ( ) and p not in self . files :
self . files . append ( p )
self . include_map [ str ( p ) ] = True
try :
size = human_size ( p . stat ( ) . st_size )
except Exception :
size = " ? "
self . tree . insert ( ' ' , tk . END , values = ( p . name , str ( p ) , p . suffix . lower ( ) , size , ' ✓ ' ) )
added + = 1
new_files . append ( p )
if added :
self . _set_status ( f " Added { added } item(s) " )
self . _set_dirty ( True )
# Kick off background ingest for any new files not in corpus
threading . Thread ( target = self . _ensure_corpus_for_files , args = ( new_files , ) , kwargs = { " blocking " : True } , daemon = True ) . start ( )
#
def on_add_files ( self ) :
paths = filedialog . askopenfilenames ( title = " Select files " )
if not paths :
return
self . _add_paths ( [ Path ( p ) for p in paths ] )
def on_add_folder ( self ) :
path = filedialog . askdirectory ( title = " Select folder " )
if not path :
return
self . _add_paths ( [ Path ( path ) ] )
def on_remove_selected ( self ) :
sels = self . tree . selection ( )
if not sels :
return
removed = 0
for item in sels :
vals = self . tree . item ( item , ' values ' )
if vals :
p = Path ( vals [ 1 ] )
if p in self . files :
self . files . remove ( p )
try :
self . include_map . pop ( str ( p ) , None )
except Exception :
pass
self . tree . delete ( item )
removed + = 1
if removed :
self . _set_status ( f " Removed { removed } item(s) " )
self . _set_dirty ( True )
#
def on_clear_all ( self ) :
self . tree . delete ( * self . tree . get_children ( ) )
self . files . clear ( )
self . include_map . clear ( )
self . _set_status ( " Cleared " )
self . _set_dirty ( True )
#
def on_tree_click ( self , event ) :
# Toggle include column when clicked
region = self . tree . identify_region ( event . x , event . y )
if region != ' cell ' :
return
column = self . tree . identify_column ( event . x ) # e.g., '#1' .. '#5'
if column != ' #5 ' : # include column index
return
item = self . tree . identify_row ( event . y )
if not item :
return
vals = list ( self . tree . item ( item , ' values ' ) )
if not vals or len ( vals ) < 5 :
return
path = str ( vals [ 1 ] )
current = self . include_map . get ( path , True )
new = not current
self . include_map [ path ] = new
vals [ 4 ] = ' ✓ ' if new else ' '
self . tree . item ( item , values = tuple ( vals ) )
self . _set_dirty ( True )
return ' break '
# --- Unified storage helpers
def _init_storage ( self ) :
try :
self . _base_dir . mkdir ( parents = True , exist_ok = True )
self . _files_dir . mkdir ( parents = True , exist_ok = True )
if not self . _corpus_file . exists ( ) :
self . _corpus_file . write_text ( " " , encoding = " utf-8 " )
if not self . _sessions_file . exists ( ) :
self . _sessions_file . write_text ( " " , encoding = " utf-8 " )
# Build in-memory index of seen file hashes
self . _seen_hashes = set ( )
with self . _corpus_file . open ( " r " , encoding = " utf-8 " ) as fh :
for line in fh :
if not line or not line . strip ( ) :
continue
try :
obj = json . loads ( line )
except Exception :
continue
h = obj . get ( " file_hash " )
if h :
self . _seen_hashes . add ( str ( h ) )
except Exception :
# best effort; index stays possibly empty
self . _seen_hashes = set ( )
def _compute_file_hash ( self , path : Path ) - > str :
h = hashlib . sha256 ( )
try :
with path . open ( " rb " ) as fh :
while True :
b = fh . read ( 1024 * 1024 )
if not b :
break
h . update ( b )
except Exception :
# Fall back to name+mtime if unreadable
st = None
try :
st = path . stat ( )
except Exception :
pass
h . update ( ( str ( path ) + " | " + str ( getattr ( st , ' st_mtime ' , 0.0 ) ) ) . encode ( " utf-8 " , " ignore " ) )
return h . hexdigest ( )
def _ensure_file_symlink ( self , src : Path , file_hash : str ) - > Path :
# name pattern: {hash}__basename
dst = self . _files_dir / f " { file_hash } __ { src . name } "
try :
if not dst . exists ( ) :
copy_or_link ( src , self . _files_dir )
except Exception :
pass
return dst
def _ingest_single_file ( self , src : Path , file_hash : str , * , verbose : bool = False ) - > bool :
# Build corpus entries for a single file into the unified corpus
try :
tmp_dir = Path . cwd ( ) / " .idea-hole " / " ingest_tmp " / file_hash
try :
if tmp_dir . exists ( ) :
shutil . rmtree ( tmp_dir )
except Exception :
pass
tmp_dir . mkdir ( parents = True , exist_ok = True )
# Put a copy or symlink inside tmp_dir
local_file = copy_or_link ( src , tmp_dir )
# Try external builder first
external = None
script = Path ( __file__ ) . parent / " corpus_builder.py "
if script . exists ( ) :
external = ExternalCorpusBuilder ( script )
tmp_out = tmp_dir / " out.jsonl "
ok = False
if external is not None :
self . _set_status ( f " Indexing { src . name } (external)… " )
ok = external . build ( tmp_dir , tmp_out , workers = 2 , verbose = verbose )
if not ok :
self . _set_status ( f " Indexing { src . name } (simple)… " )
try :
simple = SimpleCorpusBuilder ( )
recs = simple . build ( tmp_dir , tmp_out )
ok = bool ( recs )
except Exception :
ok = False
# Append to unified corpus with added metadata
if ok and tmp_out . exists ( ) :
ts = int ( time . time ( ) )
with tmp_out . open ( " r " , encoding = " utf-8 " ) as fh_in , self . _corpus_file . open ( " a " , encoding = " utf-8 " ) as fh_out :
for line in fh_in :
if not line . strip ( ) :
continue
try :
obj = json . loads ( line )
except Exception :
continue
obj [ " file_hash " ] = file_hash
# Ensure source_path is the real source
obj [ " source_path " ] = str ( src . resolve ( ) )
obj . setdefault ( " mime " , obj . get ( " mime " ) or None )
obj [ " added_at " ] = ts
try :
fh_out . write ( json . dumps ( obj , ensure_ascii = False ) + " \n " )
except Exception :
# As last resort, write ascii-only
fh_out . write ( json . dumps ( obj ) + " \n " )
# Update in-memory index
self . _seen_hashes . add ( file_hash )
return True
return False
finally :
# Cleanup tmp_dir
try :
shutil . rmtree ( Path . cwd ( ) / " .idea-hole " / " ingest_tmp " / file_hash )
except Exception :
pass
def _ensure_corpus_for_files ( self , paths : List [ Path ] , * , blocking : bool = True ) :
if not paths :
return
# Compute hashes and create symlinks; ingest missing
to_ingest : List [ tuple [ Path , str ] ] = [ ]
for p in paths :
try :
h = self . _compute_file_hash ( p )
except Exception :
continue
self . file_hashes [ str ( p ) ] = h
self . _ensure_file_symlink ( p , h )
if h not in self . _seen_hashes and h not in self . _ingesting :
to_ingest . append ( ( p , h ) )
if not to_ingest :
return
def _run ( ) :
try :
for src , h in to_ingest :
self . _ingesting . add ( h )
try :
self . _ingest_single_file ( src , h , verbose = False )
finally :
try :
self . _ingesting . remove ( h )
except Exception :
pass
self . _set_status ( " Corpus up to date " )
except Exception :
self . _set_status ( " Corpus ingest failed (see logs) " )
if blocking :
_run ( )
else :
threading . Thread ( target = _run , daemon = True ) . start ( )
def _load_records_for_hashes ( self , hashes : Set [ str ] ) - > List [ Record ] :
out : List [ Record ] = [ ]
if not hashes :
return out
try :
with self . _corpus_file . open ( " r " , encoding = " utf-8 " ) as fh :
for line in fh :
if not line . strip ( ) :
continue
try :
obj = json . loads ( line )
except Exception :
continue
if str ( obj . get ( " file_hash " ) or " " ) not in hashes :
continue
out . append ( Record (
id = str ( obj . get ( " id " , " " ) ) ,
title = str ( obj . get ( " title " , " " ) ) ,
text = str ( obj . get ( " text " , " " ) ) ,
source_path = str ( obj . get ( " source_path " , " " ) ) if obj . get ( " source_path " ) else None ,
mime = str ( obj . get ( " mime " , " " ) ) if obj . get ( " mime " ) else None ,
) )
except Exception :
pass
return out
def ensure_and_load_kb_for_current ( self ) - > List [ Record ] :
# Ensure corpus contains all current files, then load those records
self . _set_status ( " Checking corpus… " )
self . _ensure_corpus_for_files ( self . files , blocking = True )
hashes = { self . file_hashes . get ( str ( p ) ) for p in self . files }
hashes = { h for h in hashes if h }
recs = self . _load_records_for_hashes ( hashes )
self . records = recs
self . corpus_path = self . _corpus_file
self . _set_status ( f " KB ready with { len ( recs ) } records " )
return recs
2025-11-30 07:30:09 +01:00
# --- Notes rephrase / extend
def on_rephrase ( self ) :
model = ( self . ollama_model . get ( ) or " " ) . strip ( )
if not model or model == " Select model... " :
self . _ui ( lambda : messagebox . showinfo ( " Select model " , " Please select a model first. " ) )
return
note = self . notes . get ( " 1.0 " , tk . END ) . strip ( )
if not note :
self . _ui ( lambda : messagebox . showinfo ( " No notes " , " Add some notes to rephrase. " ) )
return
self . _set_status ( " Rephrasing… " )
threading . Thread ( target = self . _rephrase_thread , args = ( note , model ) , daemon = True ) . start ( )
def _rephrase_thread ( self , note : str , model : str ) :
try :
client = OllamaClient ( host = self . ollama_host . get ( ) )
variants : List [ Dict [ str , str ] ] = [ {
" key " : " original " ,
" label " : " Original Note " ,
" text " : note ,
} ]
total = len ( REPHRASE_LENSES )
for idx , lens in enumerate ( REPHRASE_LENSES , start = 1 ) :
self . _set_status ( f " Rephrasing ( { idx } / { total } )… " )
prompt = ( lens . get ( " prompt " ) or " " ) . replace ( " {USER_NOTE} " , note )
try :
raw = client . generate ( model = model , prompt = prompt )
except Exception as e :
raise RuntimeError ( f " { lens . get ( ' label ' , ' Variant ' ) } failed: { e } " )
text = sanitize_llm_text_simple ( raw )
variants . append ( {
" key " : lens . get ( " key " ) or f " lens_ { idx } " ,
" label " : lens . get ( " label " ) or f " Variant { idx } " ,
" text " : text ,
} )
self . _set_status ( " Rephrase complete " )
self . _ui ( lambda v = variants : self . _apply_rephrase_results ( v , select_key = " original " ) )
except Exception as e :
self . _set_status ( " Rephrase failed " )
msg = f " Rephrase failed: \n { e } "
self . _ui ( lambda m = msg : messagebox . showerror ( " Error " , m ) )
def _apply_rephrase_results ( self , variants : List [ Dict [ str , str ] ] , * , select_key : Optional [ str ] = None , mark_dirty : bool = True ) :
self . rephrase_variants = variants
self . rephrase_selected_key = select_key or ( variants [ 0 ] . get ( " key " ) if variants else None )
self . _refresh_rephrase_tree ( )
if mark_dirty :
self . _set_dirty ( True )
def _refresh_rephrase_tree ( self ) :
if not hasattr ( self , " rephrase_tree " ) :
return
try :
self . rephrase_tree . delete ( * self . rephrase_tree . get_children ( ) )
except Exception :
return
if not self . rephrase_variants :
try :
self . rephrase_tree . insert ( ' ' , tk . END , iid = " placeholder " , values = ( " No variants yet " , " Click Rephrase to generate " ) )
except Exception :
pass
return
for v in self . rephrase_variants :
key = v . get ( " key " ) or " "
label = v . get ( " label " ) or key
preview = self . _preview_rephrase_text ( v . get ( " text " , " " ) )
iid = key or str ( id ( v ) )
try :
self . rephrase_tree . insert ( ' ' , tk . END , iid = iid , values = ( label , preview ) )
except Exception :
# if iid already exists, fall back to autogenerated
try :
self . rephrase_tree . insert ( ' ' , tk . END , values = ( label , preview ) )
except Exception :
pass
key_to_select = self . rephrase_selected_key or ( self . rephrase_variants [ 0 ] . get ( " key " ) if self . rephrase_variants else None )
if key_to_select :
self . _suppress_rephrase_select = True
try :
self . rephrase_tree . selection_set ( key_to_select )
self . rephrase_tree . see ( key_to_select )
except Exception :
pass
finally :
self . _suppress_rephrase_select = False
@staticmethod
def _preview_rephrase_text ( text : str , limit : int = 160 ) - > str :
clean = re . sub ( r " \ s+ " , " " , text or " " ) . strip ( )
if len ( clean ) < = limit :
return clean
return clean [ : limit ] . rstrip ( ) + " ... "
def _on_rephrase_select ( self , _evt = None ) :
if self . _suppress_rephrase_select :
return
sel = None
try :
sel = self . rephrase_tree . selection ( )
except Exception :
sel = None
if not sel :
return
key = sel [ 0 ]
if key == " placeholder " :
return
variant = None
for v in self . rephrase_variants :
if ( v . get ( " key " ) or " " ) == key :
variant = v
break
if not variant :
return
text = variant . get ( " text " , " " )
self . rephrase_selected_key = key
current = self . notes . get ( " 1.0 " , tk . END ) . strip ( )
if current == text . strip ( ) :
return
self . notes . delete ( " 1.0 " , tk . END )
self . notes . insert ( " 1.0 " , text )
try :
self . notes . see ( " 1.0 " )
except Exception :
pass
self . _set_dirty ( True )
def on_extend ( self ) :
model = ( self . ollama_model . get ( ) or " " ) . strip ( )
if not model or model == " Select model... " :
self . _ui ( lambda : messagebox . showinfo ( " Select model " , " Please select a model first. " ) )
return
note = self . notes . get ( " 1.0 " , tk . END ) . strip ( )
if not note :
self . _ui ( lambda : messagebox . showinfo ( " No notes " , " Add some notes to extend. " ) )
return
self . _set_status ( " Extending… " )
threading . Thread ( target = self . _extend_thread , args = ( note , model ) , daemon = True ) . start ( )
def _extend_thread ( self , note : str , model : str ) :
try :
client = OllamaClient ( host = self . ollama_host . get ( ) )
prompt = EXTEND_PROMPT . replace ( " {USER_NOTE} " , note )
raw = client . generate ( model = model , prompt = prompt )
text = sanitize_llm_text_simple ( raw )
if not text . strip ( ) :
raise RuntimeError ( " Empty response from model " )
self . _set_status ( " Extension ready " )
def _apply ( ) :
existing = self . notes . get ( " 1.0 " , tk . END )
if not existing . strip ( ) :
self . notes . delete ( " 1.0 " , tk . END )
self . notes . insert ( " 1.0 " , text )
else :
prefix = " \n " if existing . endswith ( " \n " ) else " \n "
self . notes . insert ( tk . END , prefix + text )
try :
self . notes . see ( tk . END )
except Exception :
pass
self . _set_dirty ( True )
self . _ui ( _apply )
except Exception as e :
self . _set_status ( " Extend failed " )
msg = f " Extend failed: \n { e } "
self . _ui ( lambda m = msg : messagebox . showerror ( " Error " , m ) )
2025-09-12 21:45:11 +02:00
# --- Concept generation
def on_generate ( self ) :
# Ensure a model is selected
model = ( self . ollama_model . get ( ) or " " ) . strip ( )
if not model or model == " Select model... " :
self . _ui ( lambda : messagebox . showinfo ( " Select model " , " Please select a model first. " ) )
return
if not self . files and not self . notes . get ( " 1.0 " , tk . END ) . strip ( ) :
self . _ui ( lambda : messagebox . showinfo ( " Nothing to do " , " Add files or write some notes first. " ) )
return
threading . Thread ( target = self . _generate_concept_thread , daemon = True ) . start ( )
def _generate_concept_thread ( self ) :
try :
self . _set_status ( " Preparing knowledge base… " )
records = self . ensure_and_load_kb_for_current ( )
notes = self . notes . get ( " 1.0 " , tk . END ) . strip ( )
kb = build_kb_string ( records )
assets = [ p for p in self . files if self . include_map . get ( str ( p ) , True ) ]
assets_str = " \n " . join ( f " - { Path ( p ) . name } " for p in assets ) or " (none) "
prompt = (
PROMPT_TEMPLATE
. replace ( " {NOTES} " , notes or " (none) " )
. replace ( " {KB} " , kb or " (empty) " )
. replace ( " {ASSETS} " , assets_str )
)
self . _set_status ( " Querying Ollama… " )
client = OllamaClient ( host = self . ollama_host . get ( ) )
concept_md = client . generate ( model = self . ollama_model . get ( ) , prompt = prompt )
concept_md = sanitize_llm_text_simple ( concept_md )
# Extract title/description via second structured call
title , desc = self . _extract_title_desc ( concept_md , client )
if not desc :
desc = " "
# Ensure top heading matches title
concept_md = md_heading_replace_or_insert ( concept_md , title )
if not concept_md . strip ( ) :
raise RuntimeError ( " Empty response from model " )
self . _set_status ( " Concept generated " )
# schedule UI update (concept, title, desc)
def _apply ( ) :
self . concept . delete ( " 1.0 " , tk . END )
self . concept . insert ( " 1.0 " , concept_md )
self . title_var . set ( title )
self . desc_var . set ( strip_wrapping_quotes ( desc ) [ : 120 ] )
self . update_push_state ( )
self . _set_dirty ( True )
self . _ui ( _apply )
except Exception as e :
self . _set_status ( " Generation failed " )
msg = f " Failed to generate concept: \n { e } "
self . _ui ( lambda m = msg : messagebox . showerror ( " Error " , m ) )
return
# After a successful apply, attempt autosave shortly after
try :
self . after ( 150 , self . _autosave_after_generation )
except Exception :
pass
# --- Rebuild KB only
def on_rebuild_kb ( self ) :
threading . Thread ( target = self . _rebuild_kb_thread , daemon = True ) . start ( )
def _rebuild_kb_thread ( self ) :
try :
self . ensure_and_load_kb_for_current ( )
except Exception as e :
self . _set_status ( " KB refresh failed " )
msg = f " KB refresh failed: \n { e } "
self . _ui ( lambda m = msg : messagebox . showerror ( " Error " , m ) )
# --- Sessions: New / Open / Save
def on_new_session ( self ) :
# Prompt to save if there are unsaved changes
if not self . _maybe_save_if_dirty ( ) :
return
try :
self . on_clear_all ( )
self . title_var . set ( " " )
self . desc_var . set ( " " )
self . notes . delete ( " 1.0 " , tk . END )
self . concept . delete ( " 1.0 " , tk . END )
2025-11-30 07:30:09 +01:00
self . _apply_rephrase_results ( [ ] , mark_dirty = False )
2025-09-12 21:45:11 +02:00
self . _set_status ( " New session " )
self . _set_dirty ( False )
# Reset last-saved snapshot to clean baseline
self . _last_saved = self . _snapshot_session_state ( )
self . update_push_state ( )
except Exception :
pass
def on_save_session ( self ) :
ok = self . _save_session ( confirm_overwrite = True , autosave = False )
if ok :
self . _set_dirty ( False )
def on_open_session ( self ) :
# Guard unsaved changes
if not self . _maybe_save_if_dirty ( ) :
return
sessions = self . _load_all_sessions ( )
if not sessions :
self . _ui ( lambda : messagebox . showinfo ( " No sessions " , " No sessions found. Save one first. " ) )
return
# Simple chooser dialog
dlg = tk . Toplevel ( self )
dlg . title ( " Open Session " )
dlg . geometry ( " 600x360 " )
frm = ttk . Frame ( dlg )
frm . pack ( fill = tk . BOTH , expand = True , padx = 8 , pady = 8 )
cols = ( " title " , " saved " )
tv = ttk . Treeview ( frm , columns = cols , show = " headings " )
tv . heading ( " title " , text = " Title " )
tv . heading ( " saved " , text = " Saved " )
tv . column ( " title " , width = 360 )
tv . column ( " saved " , width = 180 )
vs = ttk . Scrollbar ( frm , orient = tk . VERTICAL , command = tv . yview )
tv . configure ( yscrollcommand = vs . set )
tv . pack ( side = tk . LEFT , fill = tk . BOTH , expand = True )
vs . pack ( side = tk . RIGHT , fill = tk . Y )
def fmt_ts ( ts : int ) - > str :
try :
return time . strftime ( " % Y- % m- %d % H: % M: % S " , time . localtime ( ts ) )
except Exception :
return str ( ts )
def refresh ( selected_saved_at : Optional [ int ] = None ) :
nonlocal sessions
tv . delete ( * tv . get_children ( ) )
sessions = self . _load_all_sessions ( )
for idx , s in enumerate ( sessions ) :
tv . insert ( ' ' , tk . END , iid = str ( idx ) , values = ( s . get ( " title " , " " ) , fmt_ts ( int ( s . get ( " saved_at " , 0 ) ) ) ) )
# Try to reselect
if selected_saved_at is not None :
for iid in tv . get_children ( ' ' ) :
i = int ( iid )
try :
if int ( sessions [ i ] . get ( " saved_at " , - 1 ) ) == int ( selected_saved_at ) :
tv . selection_set ( iid )
tv . see ( iid )
break
except Exception :
pass
refresh ( )
sel_idx = { " i " : None }
def _choose_and_close ( ) :
sel = tv . selection ( )
if not sel :
return
try :
sel_idx [ " i " ] = int ( sel [ 0 ] )
except Exception :
sel_idx [ " i " ] = None
dlg . destroy ( )
tv . bind ( ' <Double-1> ' , lambda _e : _choose_and_close ( ) )
btns = ttk . Frame ( dlg )
btns . pack ( fill = tk . X , padx = 8 , pady = 8 )
def _rename ( ) :
sel = tv . selection ( )
if not sel :
return
i = int ( sel [ 0 ] ) ; s = sessions [ i ]
# Prompt for new title
ttl = tk . Toplevel ( dlg ) ; ttl . title ( " Rename Session " ) ; ttl . geometry ( " 420x120 " )
f = ttk . Frame ( ttl ) ; f . pack ( fill = tk . BOTH , expand = True , padx = 8 , pady = 8 )
ttk . Label ( f , text = " New title: " ) . pack ( anchor = tk . W )
var = tk . StringVar ( value = s . get ( " title " , " " ) )
ent = ttk . Entry ( f , textvariable = var )
ent . pack ( fill = tk . X )
ent . focus_set ( )
btnf = ttk . Frame ( f ) ; btnf . pack ( fill = tk . X , pady = ( 8 , 0 ) )
def _ok ( ) :
new_title = ( var . get ( ) or " " ) . strip ( )
if not new_title :
ttl . destroy ( ) ; return
# If another session with new_title exists (not this one), confirm overwrite
exists = any ( ( x . get ( " title " ) == new_title and int ( x . get ( " saved_at " , - 1 ) ) != int ( s . get ( " saved_at " , - 1 ) ) ) for x in sessions )
if exists :
if not messagebox . askyesno ( " Overwrite " , f " A session titled ' { new_title } ' exists. Overwrite it? " ) :
return
# Rewrite sessions: remove all with new_title (except current); then update current title
all_s = self . _load_all_sessions ( )
cur_sa = int ( s . get ( " saved_at " , - 1 ) )
filtered = [ x for x in all_s if x . get ( " title " ) != new_title and int ( x . get ( " saved_at " , - 1 ) ) != cur_sa ]
s2 = dict ( s ) ; s2 [ " title " ] = new_title
filtered . append ( s2 )
self . _write_all_sessions ( filtered )
ttl . destroy ( )
refresh ( selected_saved_at = cur_sa )
ttk . Button ( btnf , text = " OK " , command = _ok ) . pack ( side = tk . RIGHT )
ttk . Button ( btnf , text = " Cancel " , command = ttl . destroy ) . pack ( side = tk . RIGHT , padx = ( 0 , 8 ) )
ttl . transient ( dlg ) ; ttl . grab_set ( ) ; self . wait_window ( ttl )
def _delete ( ) :
sel = tv . selection ( )
if not sel :
return
i = int ( sel [ 0 ] ) ; s = sessions [ i ]
if not messagebox . askyesno ( " Delete Session " , f " Delete ' { s . get ( ' title ' , ' ' ) } ' ? " ) :
return
all_s = self . _load_all_sessions ( )
sa = int ( s . get ( " saved_at " , - 1 ) )
kept = [ x for x in all_s if int ( x . get ( " saved_at " , - 1 ) ) != sa ]
self . _write_all_sessions ( kept )
refresh ( )
ttk . Button ( btns , text = " Delete " , command = _delete ) . pack ( side = tk . LEFT )
ttk . Button ( btns , text = " Rename " , command = _rename ) . pack ( side = tk . LEFT , padx = ( 6 , 0 ) )
ttk . Button ( btns , text = " Open " , command = _choose_and_close ) . pack ( side = tk . RIGHT )
ttk . Button ( btns , text = " Cancel " , command = dlg . destroy ) . pack ( side = tk . RIGHT , padx = ( 0 , 8 ) )
dlg . transient ( self )
dlg . grab_set ( )
self . wait_window ( dlg )
i = sel_idx . get ( " i " )
if i is None :
return
s = sessions [ i ]
# Apply session
try :
self . on_clear_all ( )
self . title_var . set ( s . get ( " title " , " " ) )
self . desc_var . set ( s . get ( " description " , " " ) )
self . notes . delete ( " 1.0 " , tk . END )
self . notes . insert ( " 1.0 " , s . get ( " notes " , " " ) )
self . concept . delete ( " 1.0 " , tk . END )
self . concept . insert ( " 1.0 " , s . get ( " concept " , " " ) )
2025-11-30 07:30:09 +01:00
rephrases_raw = s . get ( " rephrase_variants " ) or [ ]
cleaned_rephrases : List [ Dict [ str , str ] ] = [ ]
for v in rephrases_raw :
if isinstance ( v , dict ) :
cleaned_rephrases . append ( {
" key " : str ( v . get ( " key " ) or " " ) ,
" label " : str ( v . get ( " label " ) or " " ) ,
" text " : str ( v . get ( " text " ) or " " ) ,
} )
self . _apply_rephrase_results ( cleaned_rephrases , select_key = s . get ( " rephrase_selected_key " ) , mark_dirty = False )
2025-09-12 21:45:11 +02:00
files = s . get ( " files " ) or [ ]
# Re-add files; prefer original path, fallback to symlink by hash
resolved : List [ Path ] = [ ]
for f in files :
p = Path ( f . get ( " path " ) or " " )
h = str ( f . get ( " file_hash " ) or " " )
if not p . exists ( ) and h :
# try to locate symlink with hash prefix
try :
for q in self . _files_dir . glob ( f " { h } __* " ) :
p = q
break
except Exception :
pass
if p . exists ( ) :
resolved . append ( p )
self . _add_paths ( resolved )
# Restore include flags
for f in files :
p = str ( f . get ( " path " ) or " " )
inc = bool ( f . get ( " include " , True ) )
if p :
self . include_map [ p ] = inc
# Reflect include flags in UI rows
for item in self . tree . get_children ( ' ' ) :
vals = list ( self . tree . item ( item , ' values ' ) )
if vals and len ( vals ) > = 5 :
path = str ( vals [ 1 ] )
vals [ 4 ] = ' ✓ ' if self . include_map . get ( path , True ) else ' '
self . tree . item ( item , values = tuple ( vals ) )
self . update_push_state ( )
self . _set_status ( " Session loaded " )
self . _set_dirty ( False )
# Save snapshot of the loaded state for accurate dirty detection
self . _last_saved = self . _snapshot_session_state ( )
except Exception as e :
self . _ui ( lambda m = f " Failed to open session: \n { e } " : messagebox . showerror ( " Error " , m ) )
# (Description generation now runs automatically after concept generation.)
# --- Push to Repo
def update_push_state ( self ) :
title_ok = bool ( self . title_var . get ( ) . strip ( ) )
desc_ok = bool ( self . desc_var . get ( ) . strip ( ) )
concept_ok = bool ( self . concept . get ( " 1.0 " , tk . END ) . strip ( ) )
state = tk . NORMAL if ( title_ok and desc_ok and concept_ok ) else tk . DISABLED
try :
self . push_btn . configure ( state = state )
except Exception :
pass
def _on_concept_modified ( self , _evt = None ) :
try :
if self . concept . edit_modified ( ) :
self . update_push_state ( )
self . _set_dirty ( True )
self . concept . edit_modified ( False )
except Exception :
pass
def _on_notes_modified ( self , _evt = None ) :
try :
if self . notes . edit_modified ( ) :
self . _set_dirty ( True )
self . notes . edit_modified ( False )
except Exception :
pass
def _open_path_default ( self , path : Path ) - > bool :
try :
if sys . platform . startswith ( ' darwin ' ) :
subprocess . Popen ( [ ' open ' , str ( path ) ] )
return True
elif os . name == ' nt ' :
os . startfile ( str ( path ) ) # type: ignore[attr-defined]
return True
else :
subprocess . Popen ( [ ' xdg-open ' , str ( path ) ] )
return True
except Exception :
return False
def on_preview ( self ) :
""" Generate a PDF preview using the same conversion flow as Push, but in a temp folder.
- Writes a temporary README . md and copies selected assets into . idea - hole / preview / < slug > - preview
- Exports PDF into that same folder and opens it with the OS default viewer
"""
concept_text = self . concept . get ( " 1.0 " , tk . END ) . strip ( )
if not concept_text :
self . _ui ( lambda : messagebox . showinfo ( " No concept " , " Please generate or paste the concept text to preview. " ) )
return
threading . Thread ( target = self . _preview_thread , daemon = True ) . start ( )
def _preview_thread ( self ) :
try :
title = ( self . title_var . get ( ) or " " ) . strip ( )
slug = self . _slug ( title or " preview " )
# Prepare preview workspace
base = Path . cwd ( ) / " .idea-hole " / " preview " / f " { slug } -preview "
try :
if base . exists ( ) :
shutil . rmtree ( base )
except Exception :
pass
base . mkdir ( parents = True , exist_ok = True )
# Write Markdown
md_path = base / " README.md "
md_text = self . concept . get ( " 1.0 " , tk . END ) . strip ( )
md_path . write_text ( md_text , encoding = " utf-8 " )
# Copy selected assets into preview workspace
assets = [ p for p in self . files if self . include_map . get ( str ( p ) , True ) ]
for src in assets :
try :
dst = base / src . name
if dst . name . lower ( ) in { " readme.md " , f " { slug } -concept.pdf " . lower ( ) , f " { slug } -preview.pdf " . lower ( ) } :
dst = base / f " asset- { src . name } "
shutil . copy2 ( src , dst )
except Exception :
pass
# Export PDF in the same workspace folder
pdf_path = base / f " { slug } -preview.pdf "
self . _set_status ( " Exporting PDF preview… " )
ok_pdf = self . _convert_markdown_to_pdf ( md_path , pdf_path )
if not ok_pdf :
logs_dir = Path . cwd ( ) / " .idea-hole " / " logs "
log_path = logs_dir / f " pdf_export_ { base . name } .log "
def _show_pdf_error ( ) :
msg = " PDF preview failed. "
if log_path . exists ( ) :
try :
data = log_path . read_text ( encoding = ' utf-8 ' )
snippet = data [ - 2000 : ] if len ( data ) > 2000 else data
rel = Path ( ' .idea-hole ' ) / ' logs ' / log_path . name
msg = f " PDF preview failed. See { rel } for details. \n \n Last output: \n { snippet } "
except Exception :
pass
messagebox . showerror ( " PDF Preview " , msg )
self . _ui ( _show_pdf_error )
self . _set_status ( " Preview failed " )
return
# Try to open the generated PDF
opened = self . _open_path_default ( pdf_path )
if not opened :
self . _ui ( lambda : messagebox . showinfo ( " Preview ready " , f " Preview PDF saved to: \n { pdf_path } " ) )
self . _set_status ( " Preview ready " )
except Exception as e :
self . _set_status ( " Preview failed " )
self . _ui ( lambda m = f " Failed to preview PDF: \n { e } " : messagebox . showerror ( " Error " , m ) )
def on_push ( self ) :
title = self . title_var . get ( ) . strip ( )
desc = self . desc_var . get ( ) . strip ( )
if not title or not desc :
self . _ui ( lambda : messagebox . showinfo ( " Missing fields " , " Please fill Title and Description. " ) )
return
if not self . concept . get ( " 1.0 " , tk . END ) . strip ( ) :
self . _ui ( lambda : messagebox . showinfo ( " No concept " , " Please generate or paste the concept text. " ) )
return
threading . Thread ( target = self . _push_thread , daemon = True ) . start ( )
# --- Git helpers
def _run_git ( self , repo_dir : Path , * args : str ) - > subprocess . CompletedProcess :
return subprocess . run ( [ " git " , * args ] , cwd = str ( repo_dir ) , stdout = subprocess . PIPE , stderr = subprocess . STDOUT , text = True )
def _ensure_repo_initialized ( self , repo_dir : Path ) :
repo_dir . mkdir ( parents = True , exist_ok = True )
if not ( repo_dir / " .git " ) . exists ( ) :
_ = subprocess . run ( [ " git " , " init " ] , cwd = str ( repo_dir ) , stdout = subprocess . PIPE , stderr = subprocess . STDOUT , text = True )
def _ensure_remote_origin ( self , repo_dir : Path , remote_url : str ) :
if not remote_url :
return
res = self . _run_git ( repo_dir , " remote " , " get-url " , " origin " )
if res . returncode == 0 :
current = ( res . stdout or " " ) . strip ( )
if current != remote_url :
_ = self . _run_git ( repo_dir , " remote " , " set-url " , " origin " , remote_url )
else :
_ = self . _run_git ( repo_dir , " remote " , " add " , " origin " , remote_url )
def _ensure_branch_master ( self , repo_dir : Path ) :
""" Ensure the default branch is master even on unborn HEAD. """
# Try to read symbolic HEAD (works even if unborn)
res = self . _run_git ( repo_dir , " symbolic-ref " , " -q " , " HEAD " )
headref = ( res . stdout or " " ) . strip ( ) if res . returncode == 0 else " "
if not headref :
# Unborn or detached; set HEAD to refs/heads/master
self . _run_git ( repo_dir , " symbolic-ref " , " HEAD " , " refs/heads/master " )
return
if headref . endswith ( " /master " ) :
return
# Otherwise rename current branch to master
self . _run_git ( repo_dir , " branch " , " -M " , " master " )
def _slug ( self , s : str ) - > str :
s = re . sub ( r " [ \ s]+ " , " - " , s . strip ( ) )
s = re . sub ( r " [^a-zA-Z0-9._-] " , " - " , s )
return re . sub ( r " -+ " , " - " , s ) . strip ( " -_ " )
# --- Concepts index helpers
def _build_slug_map_from_sessions ( self ) - > Dict [ str , Dict [ str , str ] ] :
""" Return mapping: slug -> { title, description}; prefer most recent saved_at per slug. """
entries = self . _load_all_sessions ( )
best : Dict [ str , Dict [ str , str ] ] = { }
best_ts : Dict [ str , int ] = { }
for e in entries :
title = ( e . get ( " title " ) or " " ) . strip ( )
if not title :
continue
slug = self . _slug ( title )
ts = 0
try :
ts = int ( e . get ( " saved_at " ) or 0 )
except Exception :
ts = 0
if slug not in best or ts > = best_ts . get ( slug , 0 ) :
best [ slug ] = { " title " : title , " description " : ( e . get ( " description " ) or " " ) . strip ( ) }
best_ts [ slug ] = ts
return best
def _write_concepts_index ( self , repo_dir : Path ) :
try :
# Map available descriptions from sessions.jsonl by slug
slug_map = self . _build_slug_map_from_sessions ( )
items = [ ]
seen : Set [ str ] = set ( )
for child in sorted ( repo_dir . iterdir ( ) , key = lambda p : p . name . lower ( ) ) :
if not child . is_dir ( ) :
continue
name = child . name
if name . startswith ( ' . ' ) or name in { " .git " , " node_modules " } :
continue
slug = name
if slug in seen :
continue
seen . add ( slug )
title = slug_map . get ( slug , { } ) . get ( " title " ) or re . sub ( r " [-_]+ " , " " , slug ) . strip ( ) . title ( )
desc = slug_map . get ( slug , { } ) . get ( " description " ) or " "
items . append ( ( slug , title , desc ) )
intro = (
" This folder contains a library of project concepts created with the Idea → Concept tool. "
" Each entry links to its folder with the original concept README and related assets. "
)
lines : List [ str ] = [ ]
lines . append ( " # Concepts Index " )
lines . append ( " " )
lines . append ( intro )
lines . append ( " " )
for slug , title , desc in items :
if desc :
lines . append ( f " - [ { title } ](./ { slug } /) — { desc } " )
else :
lines . append ( f " - [ { title } ](./ { slug } /) " )
( repo_dir / " README.md " ) . write_text ( " \n " . join ( lines ) . rstrip ( ) + " \n " , encoding = " utf-8 " )
except Exception :
# Non-fatal; skip index update on error
pass
def _convert_markdown_to_pdf ( self , md_file : Path , out_pdf : Path ) - > bool :
""" Single-path conversion: pandoc + tectonic with image compatibility.
- Converts unsupported image formats to PNG in a temp folder
- Uses a temporary Markdown copy for PDF only ( original README . md untouched )
- Sans - serif font , 20 mm margins
- Logs to . idea - hole / logs ( never the concepts repo )
"""
concept_dir = out_pdf . parent
concept_dir . mkdir ( parents = True , exist_ok = True )
logs_dir = Path . cwd ( ) / " .idea-hole " / " logs "
logs_dir . mkdir ( parents = True , exist_ok = True )
log_path = logs_dir / f " pdf_export_ { concept_dir . name } .log "
def _resolve ( name : str ) - > Optional [ str ] :
for base in [ None , " /opt/homebrew/bin " , " /usr/local/bin " , " /usr/bin " , " /bin " ] :
p = shutil . which ( name ) if base is None else os . path . join ( base , name )
if p and os . path . exists ( p ) :
return p
return None
pandoc = _resolve ( " pandoc " )
tectonic = _resolve ( " tectonic " )
lines : List [ str ] = [ ]
lines . append ( f " PATH= { os . environ . get ( ' PATH ' , ' ' ) } " )
lines . append ( f " md_file= { md_file } " )
lines . append ( f " resolved pandoc= { pandoc } " )
lines . append ( f " resolved tectonic= { tectonic } " )
if not pandoc or not tectonic :
lines . append ( " Missing required tools: pandoc and/or tectonic. " )
try :
log_path . write_text ( " \n " . join ( lines ) , encoding = " utf-8 " )
except Exception :
pass
return False
# Build temp workspace for PDF (images + markdown copy)
tmp_base = Path . cwd ( ) / " .idea-hole " / " tmp_pdf " / concept_dir . name
try :
if tmp_base . exists ( ) :
shutil . rmtree ( tmp_base )
except Exception :
pass
tmp_base . mkdir ( parents = True , exist_ok = True )
# Rewrite Markdown image refs to PDF-compatible formats
try :
text = md_file . read_text ( encoding = " utf-8 " )
except Exception as e :
lines . append ( f " read error: { e } " )
try : log_path . write_text ( " \n " . join ( lines ) , encoding = " utf-8 " )
except Exception : pass
return False
# Regex for Markdown images 
img_rgx = re . compile ( r " ! \ [[^ \ ]]* \ ] \ (([^ \ s)]+)(?: \ s+ \" [^ \" ]* \" )? \ ) " )
allowed_ext = { " .png " , " .jpg " , " .jpeg " , " .pdf " , " .eps " }
def ensure_image_available ( src : str ) - > str :
""" Copy or convert image into tmp_base and return the tmp filename to use in Markdown. """
p = Path ( src )
if not p . is_absolute ( ) :
p = ( concept_dir / p ) . resolve ( )
if not p . exists ( ) :
# Fallback: if src had subdirs, try by basename in concept_dir (assets were copied flat)
alt = ( concept_dir / Path ( src ) . name ) . resolve ( )
if alt . exists ( ) :
p = alt
else :
lines . append ( f " missing image: { src } " )
return src # leave as-is; LaTeX will likely show caption only
ext = p . suffix . lower ( )
# Destination filename in tmp
if ext in allowed_ext :
out_name = p . name
out_path = tmp_base / out_name
try :
if not out_path . exists ( ) :
shutil . copy2 ( str ( p ) , str ( out_path ) )
return out_name
except Exception as e :
lines . append ( f " copy fail: { src } -> { out_name } ( { e } ) " )
return src
# Special-case SVG: try CairoSVG or rsvg-convert/ImageMagick
if ext == " .svg " :
out_name = p . stem + " .png "
out_path = tmp_base / out_name
# Try CairoSVG (python lib)
try :
from cairosvg import svg2png # type: ignore
svg2png ( url = str ( p ) , write_to = str ( out_path ) )
return out_name
except Exception as e_svg_py :
lines . append ( f " cairosvg unavailable or failed: { e_svg_py } " )
# Try rsvg-convert CLI
try :
tool = shutil . which ( " rsvg-convert " )
if tool :
res = subprocess . run ( [ tool , " -f " , " png " , " -o " , str ( out_path ) , str ( p ) ] , stdout = subprocess . PIPE , stderr = subprocess . STDOUT , text = True )
if res . returncode == 0 and out_path . exists ( ) :
return out_name
lines . append ( f " rsvg-convert failed: exit { res . returncode } , { res . stdout } " )
except Exception as e_svg_cli :
lines . append ( f " rsvg-convert error: { e_svg_cli } " )
# Try ImageMagick convert
try :
tool = shutil . which ( " magick " ) or shutil . which ( " convert " )
if tool :
res = subprocess . run ( [ tool , str ( p ) , str ( out_path ) ] , stdout = subprocess . PIPE , stderr = subprocess . STDOUT , text = True )
if res . returncode == 0 and out_path . exists ( ) :
return out_name
lines . append ( f " imagemagick failed: exit { res . returncode } , { res . stdout } " )
except Exception as e_im :
lines . append ( f " imagemagick error: { e_im } " )
# fallthrough to Pillow/general path
# Try to convert unsupported formats to PNG via Pillow
try :
from PIL import Image
img = Image . open ( str ( p ) )
try :
img . seek ( 0 )
except Exception :
pass
out_name = p . stem + " .png "
out_path = tmp_base / out_name
img . convert ( " RGBA " if img . mode in ( " P " , " LA " ) else " RGB " ) . save ( str ( out_path ) , format = " PNG " )
return out_name
except Exception as e :
lines . append ( f " convert fail: { src } -> png ( { e } ) " )
# Last resort: copy original and hope engine supports it
out_name = p . name
out_path = tmp_base / out_name
try :
shutil . copy2 ( str ( p ) , str ( out_path ) )
return out_name
except Exception as e2 :
lines . append ( f " final copy fail: { src } ( { e2 } ) " )
return src
# Build a modified markdown string with tmp-local image paths
def _repl ( m : re . Match ) - > str :
orig = m . group ( 0 )
path = m . group ( 1 )
rep = ensure_image_available ( path )
# Replace the path with the tmp filename (no directories)
return orig . replace ( path , rep )
mod_text = img_rgx . sub ( _repl , text )
# Preserve multiple consecutive blank lines by inserting raw TeX vspace.
# Pandoc collapses multiple blank lines into a single paragraph break in LaTeX.
# We translate extra blank lines into additional vertical space so visual spacing matches editing.
def _preserve_extra_blank_lines ( s : str ) - > str :
s = s . replace ( " \r \n " , " \n " ) . replace ( " \r " , " \n " )
lines_in = s . split ( " \n " )
out_lines : List [ str ] = [ ]
in_fence = False
blank_run = 0
for ln in lines_in :
stripped = ln . lstrip ( )
# Toggle code fence state on ``` or ~~~ lines
if stripped . startswith ( " ``` " ) or stripped . startswith ( " ~~~ " ) :
# flush any pending blanks before fence delimiter
if blank_run > 0 :
out_lines . append ( " " )
for _ in range ( blank_run - 1 ) :
out_lines . append ( " \\ vspace {1em} " )
blank_run = 0
out_lines . append ( ln )
in_fence = not in_fence
continue
if in_fence :
# pass through exactly
if blank_run > 0 :
out_lines . append ( " " )
for _ in range ( blank_run - 1 ) :
out_lines . append ( " \\ vspace {1em} " )
blank_run = 0
out_lines . append ( ln )
continue
# Outside fences: track blank runs
if stripped == " " :
blank_run + = 1
continue
# Non-blank line: flush blanks first
if blank_run > 0 :
out_lines . append ( " " )
for _ in range ( blank_run - 1 ) :
out_lines . append ( " \\ vspace {1em} " )
blank_run = 0
out_lines . append ( ln )
# flush at EOF
if blank_run > 0 :
out_lines . append ( " " )
for _ in range ( blank_run - 1 ) :
out_lines . append ( " \\ vspace {1em} " )
return " \n " . join ( out_lines )
mod_text = _preserve_extra_blank_lines ( mod_text )
tmp_md = tmp_base / " README_pdf.md "
tmp_md . write_text ( mod_text , encoding = " utf-8 " )
# Run pandoc (tectonic engine) in tmp_base with resource-path pointing to concept_dir and tmp_base
cmd = [
pandoc ,
str ( tmp_md ) ,
" -f " , " markdown+hard_line_breaks+raw_tex " ,
" -s " ,
" --pdf-engine=tectonic " ,
" -V " , " mainfont=Helvetica " ,
" -V " , " monofont=Menlo " ,
" -V " , " geometry:margin=20mm " ,
" -V " , " fontsize=11pt " ,
" --resource-path " , f " { str ( tmp_base ) } : { str ( concept_dir ) } " ,
" -o " , str ( out_pdf ) ,
]
res = subprocess . run ( cmd , cwd = str ( tmp_base ) , stdout = subprocess . PIPE , stderr = subprocess . STDOUT , text = True )
lines . append ( " $ " + " " . join ( cmd ) )
lines . append ( f " (exit { res . returncode } ) " )
lines . append ( res . stdout or " " )
ok = ( res . returncode == 0 and out_pdf . exists ( ) )
# Fallback: retry without hard_line_breaks if first attempt failed (older pandoc versions)
if not ok :
try :
cmd_fallback = [
pandoc ,
str ( tmp_md ) ,
" -f " , " markdown+hard_line_breaks " ,
" -s " ,
" --pdf-engine=tectonic " ,
" -V " , " mainfont=Helvetica " ,
" -V " , " monofont=Menlo " ,
" -V " , " geometry:margin=20mm " ,
" -V " , " fontsize=11pt " ,
" --resource-path " , f " { str ( tmp_base ) } : { str ( concept_dir ) } " ,
" -o " , str ( out_pdf ) ,
]
res2 = subprocess . run ( cmd_fallback , cwd = str ( tmp_base ) , stdout = subprocess . PIPE , stderr = subprocess . STDOUT , text = True )
lines . append ( " $ " + " " . join ( cmd_fallback ) )
lines . append ( f " (exit { res2 . returncode } ) " )
lines . append ( res2 . stdout or " " )
ok = ( res2 . returncode == 0 and out_pdf . exists ( ) )
except Exception as _e_fallback :
lines . append ( f " fallback error: { _e_fallback } " )
if not ok :
try :
log_path . write_text ( " \n " . join ( lines ) , encoding = " utf-8 " )
except Exception :
pass
# Cleanup temp workspace
try :
shutil . rmtree ( tmp_base )
except Exception :
pass
return ok
def _push_thread ( self ) :
try :
self . _set_status ( " Preparing repo… " )
repo_dir = Path . cwd ( ) / " concepts "
self . _ensure_repo_initialized ( repo_dir )
# Create concept folder and files
title = self . title_var . get ( ) . strip ( )
slug = self . _slug ( title )
concept_dir = repo_dir / slug
concept_dir . mkdir ( parents = True , exist_ok = True )
md_path = concept_dir / " README.md "
md_text = self . concept . get ( " 1.0 " , tk . END ) . strip ( )
md_path . write_text ( md_text , encoding = " utf-8 " )
# (No meta.json written; index derives data from sessions.jsonl)
# Copy selected assets into concept folder BEFORE PDF generation
assets = [ p for p in self . files if self . include_map . get ( str ( p ) , True ) ]
for src in assets :
try :
dst = concept_dir / src . name
if dst . name . lower ( ) in { " readme.md " , f " { slug } -concept.pdf " . lower ( ) } :
# avoid clobbering the generated files
dst = concept_dir / f " asset- { src . name } "
shutil . copy2 ( src , dst )
except Exception :
# ignore individual copy failures but continue
pass
# Now export PDF (images are present in concept_dir)
pdf_path = concept_dir / f " { slug } -concept.pdf "
self . _set_status ( " Exporting PDF… " )
ok_pdf = self . _convert_markdown_to_pdf ( md_path , pdf_path )
if not ok_pdf :
logs_dir = Path . cwd ( ) / " .idea-hole " / " logs "
log_path = logs_dir / f " pdf_export_ { concept_dir . name } .log "
def _show_pdf_error ( ) :
msg = " PDF export failed. "
if log_path . exists ( ) :
try :
data = log_path . read_text ( encoding = ' utf-8 ' )
snippet = data [ - 2000 : ] if len ( data ) > 2000 else data
rel = Path ( ' .idea-hole ' ) / ' logs ' / log_path . name
msg = f " PDF export failed. See { rel } for details. \n \n Last output: \n { snippet } "
except Exception :
pass
messagebox . showerror ( " PDF Export " , msg )
self . _ui ( _show_pdf_error )
# Update concepts root README index before committing
try :
self . _write_concepts_index ( repo_dir )
except Exception :
pass
# Git add/commit
self . _set_status ( " Committing… " )
add_res = self . _run_git ( repo_dir , " add " , " . " )
if add_res . returncode != 0 :
raise RuntimeError ( add_res . stdout )
commit_msg = f " { title } - { self . desc_var . get ( ) . strip ( ) } "
commit_res = self . _run_git ( repo_dir , " commit " , " -m " , commit_msg )
if commit_res . returncode != 0 :
# If nothing to commit
if " nothing to commit " not in ( commit_res . stdout or " " ) . lower ( ) :
raise RuntimeError ( commit_res . stdout )
# Ensure master branch and remote
remote = ( self . git_remote_url . get ( ) or " " ) . strip ( )
self . _ensure_branch_master ( repo_dir )
if remote :
self . _set_status ( " Pushing… " )
self . _ensure_remote_origin ( repo_dir , remote )
push_res = self . _run_git ( repo_dir , " push " , " -u " , " origin " , " master " )
if push_res . returncode != 0 :
raise RuntimeError ( push_res . stdout )
self . _set_status ( " Pushed to remote " )
else :
self . _set_status ( " Committed locally (no remote set) " )
self . _ui ( lambda : messagebox . showinfo ( " Success " , " Concept saved and repository updated. " ) )
except Exception as e :
self . _set_status ( " Push failed " )
msg = f " Failed to push to repo: \n { e } "
self . _ui ( lambda m = msg : messagebox . showerror ( " Error " , m ) )
# --- Status helper
def _set_status ( self , s : str ) :
self . _ui ( lambda : ( self . status . configure ( text = s ) , self . status . update_idletasks ( ) ) )
def _ui ( self , fn ) :
try :
self . after ( 0 , fn )
except Exception :
# best-effort fallback
try :
fn ( )
except Exception :
pass
# --- Tk helpers
def _auto_hide_scrollbar ( self , sb : ttk . Scrollbar , lo , hi ) :
""" Auto-hide a ttk.Scrollbar when content fits; show when it overflows.
Works with both pack and grid - managed scrollbars .
"""
try :
flo , fhi = float ( lo ) , float ( hi )
except Exception :
try :
flo , fhi = float ( str ( lo ) ) , float ( str ( hi ) )
except Exception :
flo , fhi = 0.0 , 1.0
# Update the scrollbar range
try :
sb . set ( flo , fhi )
except Exception :
pass
needs = not ( flo < = 0.0 and fhi > = 1.0 )
try :
mgr = sb . winfo_manager ( )
except Exception :
mgr = ' '
try :
if needs :
# show
if mgr == ' grid ' :
if not sb . winfo_ismapped ( ) :
sb . grid ( )
elif mgr == ' pack ' :
try :
sb . pack_info ( )
except Exception :
sb . pack ( side = tk . RIGHT , fill = tk . Y )
else :
# default to pack
sb . pack ( side = tk . RIGHT , fill = tk . Y )
else :
# hide
if mgr == ' grid ' :
sb . grid_remove ( )
elif mgr == ' pack ' :
sb . pack_forget ( )
except Exception :
pass
# --- Ollama model listing
def _list_models ( self ) - > List [ str ] :
try :
res = subprocess . run ( [ " ollama " , " list " ] , stdout = subprocess . PIPE , stderr = subprocess . STDOUT , text = True , timeout = 8 )
if res . returncode != 0 :
return [ ]
lines = [ ln . strip ( ) for ln in ( res . stdout or " " ) . splitlines ( ) ]
out : List [ str ] = [ ]
for ln in lines :
if not ln or ln . lower ( ) . startswith ( " name " ) :
continue
# first column is model name
name = ln . split ( ) [ 0 ]
if name and name not in out :
out . append ( name )
return out
except Exception :
return [ ]
# --- Dirty snapshot helpers
def _snapshot_session_state ( self ) - > Dict :
try :
files_list = [ {
" path " : str ( p ) ,
" include " : bool ( self . include_map . get ( str ( p ) , True ) )
} for p in self . files ]
# stable order
files_list . sort ( key = lambda x : ( x [ " path " ] , not x [ " include " ] ) )
2025-11-30 07:30:09 +01:00
variants : List [ Dict [ str , str ] ] = [ ]
for v in ( self . rephrase_variants or [ ] ) :
if not isinstance ( v , dict ) :
continue
variants . append ( {
" key " : str ( v . get ( " key " ) or " " ) ,
" label " : str ( v . get ( " label " ) or " " ) ,
" text " : str ( v . get ( " text " ) or " " ) ,
} )
2025-09-12 21:45:11 +02:00
return {
" title " : ( self . title_var . get ( ) or " " ) . strip ( ) ,
" description " : ( self . desc_var . get ( ) or " " ) . strip ( ) ,
" notes " : self . notes . get ( " 1.0 " , tk . END ) . strip ( ) ,
" concept " : self . concept . get ( " 1.0 " , tk . END ) . strip ( ) ,
" files " : files_list ,
2025-11-30 07:30:09 +01:00
" rephrase_variants " : variants ,
" rephrase_selected_key " : self . rephrase_selected_key ,
2025-09-12 21:45:11 +02:00
}
except Exception :
2025-11-30 07:30:09 +01:00
return { " title " : " " , " description " : " " , " notes " : " " , " concept " : " " , " files " : [ ] , " rephrase_variants " : [ ] , " rephrase_selected_key " : None }
2025-09-12 21:45:11 +02:00
def _is_effectively_dirty ( self ) - > bool :
try :
if not getattr ( self , " _dirty " , False ) :
return False
now = self . _snapshot_session_state ( )
last = getattr ( self , " _last_saved " , None )
if last is None :
return any ( [ now . get ( " title " ) , now . get ( " description " ) , now . get ( " notes " ) , now . get ( " concept " ) , now . get ( " files " ) ] )
return now != last
except Exception :
# On any error, be conservative and assume not dirty to avoid noisy prompts
return False
def _get_model_values ( self ) - > List [ str ] :
models = self . _list_models ( )
return [ " Select model... " ] + models
# --- Settings persistence
def _config_path ( self ) - > Path :
base = Path . cwd ( ) / " .idea-hole "
base . mkdir ( parents = True , exist_ok = True )
return base / " settings.json "
def _load_settings ( self ) :
try :
p = self . _config_path ( )
if p . exists ( ) :
obj = json . loads ( p . read_text ( encoding = ' utf-8 ' ) )
if isinstance ( obj , dict ) :
if obj . get ( ' ollama_host ' ) :
self . ollama_host . set ( obj [ ' ollama_host ' ] )
if obj . get ( ' ollama_model ' ) :
self . ollama_model . set ( obj [ ' ollama_model ' ] )
if obj . get ( ' git_remote_url ' ) :
self . git_remote_url . set ( obj [ ' git_remote_url ' ] )
2025-09-13 03:51:14 +02:00
if obj . get ( ' searx_url ' ) :
self . searx_url . set ( obj [ ' searx_url ' ] )
2025-09-12 21:45:11 +02:00
if obj . get ( ' pandoc_path ' ) :
# Only set if attribute exists
try : self . pandoc_path . set ( obj [ ' pandoc_path ' ] )
except Exception : pass
if obj . get ( ' wkhtmltopdf_path ' ) :
try : self . wkhtmltopdf_path . set ( obj [ ' wkhtmltopdf_path ' ] )
except Exception : pass
except Exception :
pass
# --- Dirty/session helpers
def _set_dirty ( self , flag : bool = True ) :
try :
self . _dirty = bool ( flag )
except Exception :
self . _dirty = True
def _load_all_sessions ( self ) - > List [ Dict ] :
entries : List [ Dict ] = [ ]
try :
with self . _sessions_file . open ( " r " , encoding = " utf-8 " ) as fh :
for line in fh :
if not line . strip ( ) :
continue
try :
obj = json . loads ( line )
if isinstance ( obj , dict ) and obj . get ( " title " ) :
entries . append ( obj )
except Exception :
continue
except Exception :
pass
return entries
def _write_all_sessions ( self , entries : List [ Dict ] ) :
tmp = self . _sessions_file . with_suffix ( " .tmp " )
try :
with tmp . open ( " w " , encoding = " utf-8 " ) as fh :
for obj in entries :
fh . write ( json . dumps ( obj , ensure_ascii = False ) + " \n " )
tmp . replace ( self . _sessions_file )
except Exception :
# fallback non-atomic
with self . _sessions_file . open ( " w " , encoding = " utf-8 " ) as fh :
for obj in entries :
fh . write ( json . dumps ( obj , ensure_ascii = False ) + " \n " )
def _session_title_exists ( self , title : str ) - > bool :
t = ( title or " " ) . strip ( )
if not t :
return False
try :
with self . _sessions_file . open ( " r " , encoding = " utf-8 " ) as fh :
for line in fh :
if not line . strip ( ) :
continue
try :
obj = json . loads ( line )
if isinstance ( obj , dict ) and ( obj . get ( " title " ) or " " ) . strip ( ) == t :
return True
except Exception :
continue
except Exception :
pass
return False
def _build_session_payload ( self ) - > Dict :
# Ensure corpus coverage before saving
self . _ensure_corpus_for_files ( self . files , blocking = True )
files_meta = [ ]
for p in self . files :
h = self . file_hashes . get ( str ( p ) ) or self . _compute_file_hash ( p )
self . file_hashes [ str ( p ) ] = h
files_meta . append ( {
" path " : str ( p ) ,
" file_hash " : h ,
" include " : bool ( self . include_map . get ( str ( p ) , True ) ) ,
} )
return {
" title " : ( self . title_var . get ( ) or " " ) . strip ( ) ,
" description " : ( self . desc_var . get ( ) or " " ) . strip ( ) ,
" notes " : self . notes . get ( " 1.0 " , tk . END ) . strip ( ) ,
" concept " : self . concept . get ( " 1.0 " , tk . END ) . strip ( ) ,
" files " : files_meta ,
" saved_at " : int ( time . time ( ) ) ,
2025-11-30 07:30:09 +01:00
" rephrase_variants " : [ {
" key " : str ( v . get ( " key " ) or " " ) ,
" label " : str ( v . get ( " label " ) or " " ) ,
" text " : str ( v . get ( " text " ) or " " ) ,
} for v in ( self . rephrase_variants or [ ] ) if isinstance ( v , dict ) ] ,
" rephrase_selected_key " : self . rephrase_selected_key ,
2025-09-12 21:45:11 +02:00
}
def _save_session ( self , * , confirm_overwrite : bool , autosave : bool ) - > bool :
title = ( self . title_var . get ( ) or " " ) . strip ( )
if not title :
self . _ui ( lambda : messagebox . showinfo ( " Title required " , " Please enter a Title before saving the session. " ) )
return False
exists = self . _session_title_exists ( title )
if exists and not confirm_overwrite and autosave :
# On autosave, do not overwrite; inform user once
self . _ui ( lambda : messagebox . showinfo ( " Autosave skipped " , f " Autosave not performed: a session titled ' { title } ' already exists. " ) )
return False
if exists and confirm_overwrite :
ok = messagebox . askyesno ( " Overwrite " , f " A session titled ' { title } ' exists. Overwrite it? " )
if not ok :
return False
# Prepare payload and rewrite file
payload = self . _build_session_payload ( )
try :
all_entries = self . _load_all_sessions ( )
if exists :
all_entries = [ e for e in all_entries if ( e . get ( " title " ) or " " ) != title ]
all_entries . append ( payload )
self . _write_all_sessions ( all_entries )
self . _set_status ( " Session saved " )
# Update last-saved snapshot for clean state detection
try :
self . _last_saved = self . _snapshot_session_state ( )
except Exception :
self . _last_saved = {
" title " : ( self . title_var . get ( ) or " " ) . strip ( ) ,
" description " : ( self . desc_var . get ( ) or " " ) . strip ( ) ,
" notes " : self . notes . get ( " 1.0 " , tk . END ) . strip ( ) ,
" concept " : self . concept . get ( " 1.0 " , tk . END ) . strip ( ) ,
" files " : [ { " path " : str ( p ) , " include " : bool ( self . include_map . get ( str ( p ) , True ) ) } for p in self . files ] ,
2025-11-30 07:30:09 +01:00
" rephrase_variants " : [ {
" key " : str ( v . get ( " key " ) or " " ) ,
" label " : str ( v . get ( " label " ) or " " ) ,
" text " : str ( v . get ( " text " ) or " " ) ,
} for v in ( self . rephrase_variants or [ ] ) if isinstance ( v , dict ) ] ,
" rephrase_selected_key " : self . rephrase_selected_key ,
2025-09-12 21:45:11 +02:00
}
return True
except Exception as e :
self . _ui ( lambda m = f " Failed to save session: \n { e } " : messagebox . showerror ( " Error " , m ) )
return False
2025-09-13 03:51:24 +02:00
# --- Prior-art search (SearXNG + embeddings)
def on_prior_art ( self ) :
model = ( self . ollama_model . get ( ) or " " ) . strip ( )
if not model or model == " Select model... " :
self . _ui ( lambda : messagebox . showinfo ( " Select model " , " Please select a model first. " ) )
return
if not self . files and not self . notes . get ( " 1.0 " , tk . END ) . strip ( ) :
self . _ui ( lambda : messagebox . showinfo ( " Nothing to search " , " Add files or write some notes first. " ) )
return
threading . Thread ( target = self . _prior_art_thread , daemon = True ) . start ( )
def _prior_art_thread ( self ) :
try :
self . _set_status ( " Preparing knowledge base… " )
records = self . ensure_and_load_kb_for_current ( )
notes = self . notes . get ( " 1.0 " , tk . END ) . strip ( )
kb_str = build_kb_string ( records )
assets = [ p for p in self . files if self . include_map . get ( str ( p ) , True ) ]
searx_url = ( self . searx_url . get ( ) or " " ) . strip ( ) or None
def _status_cb ( s : str ) :
self . _set_status ( s )
res = websearch . prior_art_search (
ollama_host = self . ollama_host . get ( ) ,
model = self . ollama_model . get ( ) ,
notes = notes ,
kb = kb_str ,
assets = [ str ( p ) for p in assets ] ,
searx_url = searx_url ,
status_cb = _status_cb ,
)
self . _set_status ( " Prior-art search complete " )
self . _ui ( lambda r = res : self . _show_prior_art_results ( r ) )
except Exception as e :
self . _set_status ( " Prior-art search failed " )
msg = f " Prior-art search failed: \n { e } "
self . _ui ( lambda m = msg : messagebox . showerror ( " Error " , m ) )
def _show_prior_art_results ( self , result : Dict [ str , Any ] ) :
try :
win = tk . Toplevel ( self )
win . title ( " Prior Art Results " )
try :
win . geometry ( " 980x640 " )
except Exception :
pass
top = ttk . Frame ( win )
top . pack ( side = tk . TOP , fill = tk . X , padx = 8 , pady = 8 )
ttk . Label ( top , text = " Queries: " ) . pack ( side = tk . LEFT )
q_str = " ; " . join ( result . get ( " queries " , [ ] ) [ : 3 ] )
q_entry = ttk . Entry ( top )
q_entry . insert ( 0 , q_str )
q_entry . configure ( state = " readonly " )
q_entry . pack ( side = tk . LEFT , fill = tk . X , expand = True , padx = ( 6 , 0 ) )
body = ttk . Panedwindow ( win , orient = tk . VERTICAL )
body . pack ( fill = tk . BOTH , expand = True , padx = 8 , pady = ( 0 , 8 ) )
upper = ttk . Frame ( body )
lower = ttk . Frame ( body )
body . add ( upper , weight = 3 )
body . add ( lower , weight = 2 )
cols = ( " score " , " url " )
tv = ttk . Treeview ( upper , columns = cols , show = " headings " )
tv . heading ( " score " , text = " Score " )
tv . heading ( " url " , text = " URL " )
tv . column ( " score " , width = 60 , anchor = tk . W )
tv . column ( " url " , width = 860 , anchor = tk . W )
vs = ttk . Scrollbar ( upper , orient = tk . VERTICAL , command = tv . yview )
tv . configure ( yscrollcommand = vs . set )
tv . pack ( side = tk . LEFT , fill = tk . BOTH , expand = True )
vs . pack ( side = tk . RIGHT , fill = tk . Y )
# Snippet area
snip = tk . Text ( lower , wrap = tk . WORD )
snip . pack ( side = tk . LEFT , fill = tk . BOTH , expand = True )
snip_sb = ttk . Scrollbar ( lower , orient = tk . VERTICAL , command = snip . yview )
snip . configure ( yscrollcommand = snip_sb . set )
snip_sb . pack ( side = tk . RIGHT , fill = tk . Y )
rows = result . get ( " results " , [ ] ) or [ ]
for i , item in enumerate ( rows ) :
score = item . get ( " score " , 0 )
url = item . get ( " url " , " " )
tv . insert ( ' ' , tk . END , iid = str ( i ) , values = ( f " { score : .1f } " , url ) )
def _on_sel ( _e = None ) :
sel = tv . selection ( )
if not sel :
return
try :
i = int ( sel [ 0 ] )
except Exception :
return
try :
data = rows [ i ]
except Exception :
data = { }
snippet = ( data . get ( " snippet " , " " ) or " " ) . strip ( )
snip . configure ( state = " normal " )
snip . delete ( " 1.0 " , tk . END )
if snippet :
snip . insert ( " 1.0 " , snippet )
snip . configure ( state = " disabled " )
tv . bind ( ' <<TreeviewSelect>> ' , _on_sel )
def _open_selected ( _e = None ) :
sel = tv . selection ( )
if not sel :
return
try :
i = int ( sel [ 0 ] )
url = rows [ i ] . get ( " url " )
if url :
webbrowser . open ( url )
except Exception :
pass
tv . bind ( ' <Double-1> ' , _open_selected )
# Initial selection
try :
first = tv . get_children ( ' ' ) [ : 1 ]
if first :
tv . selection_set ( first )
_on_sel ( )
except Exception :
pass
except Exception :
# fallback: messagebox
try :
urls = [ x . get ( " url " , " " ) for x in ( result . get ( " results " , [ ] ) or [ ] ) ] [ : 10 ]
messagebox . showinfo ( " Results " , " \n " . join ( urls ) or " No results " )
except Exception :
pass
2025-09-12 21:45:11 +02:00
def _maybe_save_if_dirty ( self ) - > bool :
try :
# Only prompt if content truly differs from last saved snapshot
if not self . _is_effectively_dirty ( ) :
return True
ans = messagebox . askyesnocancel ( " Unsaved changes " , " Save changes to the current session? " )
if ans is None :
return False
if ans is False :
return True
# ans is True -> attempt to save; confirm overwrite if needed
saved = self . _save_session ( confirm_overwrite = True , autosave = False )
if saved :
self . _set_dirty ( False )
return True
# Save not performed (likely declined overwrite) -> block
return False
except Exception :
return True
def _autosave_after_generation ( self ) :
try :
if not ( self . title_var . get ( ) or " " ) . strip ( ) :
return
saved = self . _save_session ( confirm_overwrite = False , autosave = True )
if saved :
self . _set_dirty ( False )
except Exception :
pass
def on_close ( self ) :
if self . _maybe_save_if_dirty ( ) :
try :
self . destroy ( )
except Exception :
os . _exit ( 0 )
def _save_settings ( self ) :
try :
obj = {
' ollama_host ' : self . ollama_host . get ( ) ,
' ollama_model ' : self . ollama_model . get ( ) ,
' git_remote_url ' : self . git_remote_url . get ( ) ,
2025-09-13 03:51:14 +02:00
' searx_url ' : self . searx_url . get ( ) ,
2025-09-12 21:45:11 +02:00
' pandoc_path ' : getattr ( self , ' pandoc_path ' , tk . StringVar ( value = ' ' ) ) . get ( ) if hasattr ( self , ' pandoc_path ' ) else ' ' ,
' wkhtmltopdf_path ' : getattr ( self , ' wkhtmltopdf_path ' , tk . StringVar ( value = ' ' ) ) . get ( ) if hasattr ( self , ' wkhtmltopdf_path ' ) else ' ' ,
}
self . _config_path ( ) . write_text ( json . dumps ( obj , ensure_ascii = False , indent = 2 ) , encoding = ' utf-8 ' )
except Exception :
pass
def main ( ) :
try :
app = App ( )
app . mainloop ( )
except Exception :
traceback . print_exc ( )
if __name__ == " __main__ " :
main ( )