From e831388748ae55086265e8ff9e0c340a1d0b0b9e Mon Sep 17 00:00:00 2001 From: Victor Giers Date: Sat, 13 Sep 2025 03:50:53 +0200 Subject: [PATCH] auto-git: [add] websearch.py [change] concept-maker_gui.py --- concept-maker_gui.py | 12 ++ websearch.py | 428 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 440 insertions(+) create mode 100644 websearch.py diff --git a/concept-maker_gui.py b/concept-maker_gui.py index c023e6d..9c7d865 100644 --- a/concept-maker_gui.py +++ b/concept-maker_gui.py @@ -59,6 +59,8 @@ import html # --- HTTP (stdlib) import urllib.request import urllib.error +import webbrowser +import websearch # ----------------------------- @@ -485,6 +487,12 @@ class App(TkinterDnD.Tk): # type: ignore # Default model prompt suggests explicit selection self.ollama_model = tk.StringVar(value=os.environ.get("IDEA_HOLE_MODEL", "Select model...")) self.git_remote_url = tk.StringVar(value=os.environ.get("IDEA_HOLE_REMOTE", "")) + # SearXNG URL (for prior-art search) + try: + _searx_default = websearch.SEARX_DEFAULT_URL + except Exception: + _searx_default = "http://localhost:8888" + self.searx_url = tk.StringVar(value=os.environ.get("SEARX_URL", _searx_default)) # Concept metadata self.title_var = tk.StringVar(value="") @@ -648,6 +656,7 @@ class App(TkinterDnD.Tk): # type: ignore notes_actions = ttk.Frame(notes_frame) notes_actions.pack(side=tk.TOP, fill=tk.X, padx=(8,0), pady=(6,6)) ttk.Button(notes_actions, text="Generate Concept", command=self.on_generate).pack(side=tk.LEFT) + ttk.Button(notes_actions, text="Find Prior Art", command=self.on_prior_art).pack(side=tk.LEFT, padx=(6,0)) # Concept editor + metadata concept_frame = ttk.LabelFrame(right, text="Concept (editable)") @@ -715,6 +724,8 @@ class App(TkinterDnD.Tk): # type: ignore self.model_combo.pack(side=tk.LEFT, padx=(4,10)) ttk.Label(bottom, text="Remote git repository:").pack(side=tk.LEFT) ttk.Entry(bottom, textvariable=self.git_remote_url, width=40).pack(side=tk.LEFT, padx=(4,10)) + ttk.Label(bottom, text="SearXNG URL:").pack(side=tk.LEFT) + ttk.Entry(bottom, textvariable=self.searx_url, width=26).pack(side=tk.LEFT, padx=(4,0)) # Enable/disable push button based on fields self.title_var.trace_add('write', lambda *a: (self.update_push_state(), self._set_dirty(True))) @@ -728,6 +739,7 @@ class App(TkinterDnD.Tk): # type: ignore self.ollama_host.trace_add('write', lambda *a: self._save_settings()) self.ollama_model.trace_add('write', lambda *a: self._save_settings()) self.git_remote_url.trace_add('write', lambda *a: self._save_settings()) + self.searx_url.trace_add('write', lambda *a: self._save_settings()) # Optional tool overrides would be persisted if we expose them later self.update_push_state() # Ensure placeholder if saved model not present diff --git a/websearch.py b/websearch.py new file mode 100644 index 0000000..fc2295e --- /dev/null +++ b/websearch.py @@ -0,0 +1,428 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Prior-art web search utilities for the Idea → Concept GUI. + +Design goals +- Pure stdlib networking (urllib) to avoid new dependencies. +- Works with a local SearXNG instance. +- Uses Ollama for both query generation and embeddings. +- Small, robust, and callable from the GUI with status callbacks. +""" + +from __future__ import annotations +from typing import List, Dict, Any, Optional, Tuple, Callable + +import json +import re +import time +from pathlib import Path +from urllib.parse import urlencode, urlparse +import urllib.request +import urllib.error + +try: + from bs4 import BeautifulSoup # type: ignore +except Exception: # pragma: no cover + BeautifulSoup = None # type: ignore + + +# Default SearXNG endpoint (no trailing slash) +SEARX_DEFAULT_URL = "http://localhost:8888" + + +# Prompt to generate search queries from sources +SEARCH_QUERIES_PROMPT = ( + """ +You are a cross-domain prior-art scout. Your only task is to EMIT SEARCH QUERIES (no explanations) that can be sent to SearXNG to discover whether the user’s idea already exists. + +SOURCES +- NOTES (freeform by user): +{NOTES} + +- KNOWLEDGE BASE (excerpts): +{KB} + +- ASSET NAMES (filenames the user selected): +{ASSETS} + +RULES +- Infer the single most likely idea from the sources above (silently). +- Return STRICT JSON ONLY with exactly this schema: + {"queries": ["", "", ""], "lang": ""} +- Requirements: + • Exactly 3 distinct queries; each ≤ 120 characters. + • Use quotes for distinctive phrases when helpful. + • Include synonyms or likely alternate names if useful. + • Add up to 2 disambiguating negatives with a leading “-” only when needed. + • Prefer engine-portable operators only: quotes "…", OR, -, site:, filetype:. Avoid engine-specific syntax. + • Vary the angles across the 3 queries for broad recall: + 1) direct existence/implementation check, + 2) synonym/alternate-naming variant, + 3) precision angle (compare/alternative wording OR a domain-targeted site:/filetype: if obvious). + • Optional: include a simple year hint like 2019..2025 only if recency clearly matters. +- Choose "lang" from the dominant language in NOTES/KB/ASSETS; default to "en" if unclear. +- No prose, no markdown/code fences, no extra keys beyond {"queries","lang"}. +""" +).strip() + + +# ----------------------------- +# Utils +# ----------------------------- + +USER_AGENT = ( + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome Safari" +) + +def sanitize_llm_text_simple(s: str) -> str: + try: + s = re.sub(r".*?", "", s, flags=re.S | re.I) + s = re.sub(r"^\s*```(?:\w+)?\s*", "", s) + s = re.sub(r"\s*```\s*$", "", s) + return s.strip() + except Exception: + return (s or "").strip() + + +def _http_post_json(url: str, payload: Dict[str, Any], *, timeout: int = 600) -> Dict[str, Any]: + data = json.dumps(payload).encode("utf-8") + req = urllib.request.Request( + url, + data=data, + headers={"Content-Type": "application/json", "User-Agent": USER_AGENT}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=timeout) as resp: + body = resp.read() + return json.loads(body.decode("utf-8", "ignore")) + + +def _http_get(url: str, *, timeout: int = 20, max_bytes: int = 1_500_000) -> Tuple[str, Dict[str, str]]: + req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) + with urllib.request.urlopen(req, timeout=timeout) as resp: + ctype = (resp.headers.get("content-type") or "").lower() + buf = bytearray() + while True: + chunk = resp.read(65536) + if not chunk: + break + buf.extend(chunk) + if len(buf) >= max_bytes: + break + try: + text = buf.decode(resp.headers.get_content_charset() or "utf-8", errors="ignore") + except Exception: + text = buf.decode("utf-8", errors="ignore") + return text, {"content-type": ctype} + + +def _http_get_json(url: str, params: Dict[str, Any], *, timeout: int = 30) -> Dict[str, Any]: + q = urlencode(params) + text, _ = _http_get(f"{url}?{q}", timeout=timeout) + try: + return json.loads(text) + except Exception: + return {} + + +def _is_probably_html_url(url: str) -> bool: + try: + path = urlparse(url).path.lower() + if not path: + return True + if "." not in path: + return True + ext = path.rsplit(".", 1)[-1] + return ext not in { + "pdf","jpg","jpeg","png","gif","webp","svg","mp4","mp3","mov","avi", + "zip","gz","7z","tar","rar","woff","woff2","ttf","otf" + } + except Exception: + return True + + +def _extract_text(html: str, *, max_len: int = 120_000) -> str: + if not html: + return "" + if BeautifulSoup is None: + # crude fallback if bs4 missing + txt = re.sub(r"<\s*(script|style)[^>]*>.*?<\s*/\s*\1\s*>", " ", html, flags=re.S|re.I) + txt = re.sub(r"<[^>]+>", " ", txt) + txt = re.sub(r"\s+", " ", txt) + return txt.strip()[:max_len] + try: + try: + soup = BeautifulSoup(html, "lxml") + except Exception: + soup = BeautifulSoup(html, "html.parser") + for tag in soup.select("script,style,noscript"): + tag.decompose() + text = soup.get_text("\n", strip=True) + text = re.sub(r"[ \t]+", " ", text) + text = re.sub(r"\n{3,}", "\n\n", text) + return text[:max_len] if len(text) > max_len else text + except Exception: + return "" + + +# ----------------------------- +# Ollama helpers +# ----------------------------- + +def ollama_generate(host: str, model: str, prompt: str, *, timeout: int = 600) -> str: + url = f"{host.rstrip('/')}/api/generate" + payload = {"model": model, "prompt": prompt, "stream": False} + try: + obj = _http_post_json(url, payload, timeout=timeout) + return (obj.get("response") or "").strip() + except urllib.error.HTTPError as e: + raise RuntimeError(f"Ollama HTTP error {e.code}: {e.read().decode('utf-8','ignore')}") + except Exception as e: + raise RuntimeError(f"Ollama request failed: {e}") + + +def ollama_embed(host: str, model: str, text: str, *, timeout: int = 120) -> List[float]: + url = f"{host.rstrip('/')}/api/embeddings" + payload = {"model": model, "prompt": text} + try: + obj = _http_post_json(url, payload, timeout=timeout) + except urllib.error.HTTPError as e: + raise RuntimeError(f"Ollama embeddings HTTP error {e.code}: {e.read().decode('utf-8','ignore')}") + except Exception as e: + raise RuntimeError(f"Ollama embeddings request failed: {e}") + + # Normalize common shapes + if isinstance(obj, dict): + if "error" in obj: + raise RuntimeError(f"Ollama embeddings error: {obj.get('error')}") + if "embedding" in obj and isinstance(obj["embedding"], list): + return [float(x) for x in obj["embedding"] if isinstance(x, (int, float))] + if "data" in obj and isinstance(obj["data"], list) and obj["data"]: + em = obj["data"][0].get("embedding", []) + if isinstance(em, list): + return [float(x) for x in em if isinstance(x, (int, float))] + if "embeddings" in obj and isinstance(obj["embeddings"], list) and obj["embeddings"]: + em0 = obj["embeddings"][0] + if isinstance(em0, dict): + em0 = em0.get("embedding", []) + if isinstance(em0, list): + return [float(x) for x in em0 if isinstance(x, (int, float))] + return [] + + +def cosine(a: List[float], b: List[float]) -> float: + n = min(len(a), len(b)) + if n == 0: + return 0.0 + dot = na = nb = 0.0 + for i in range(n): + x = a[i]; y = b[i] + dot += x * y + na += x * x + nb += y * y + if na <= 0.0 or nb <= 0.0: + return 0.0 + return dot / ((na ** 0.5) * (nb ** 0.5)) + + +# ----------------------------- +# SearXNG search + fetch +# ----------------------------- + +def searx_search(base_url: str, query: str, *, max_results: int = 6) -> List[Dict[str, Any]]: + base = (base_url or SEARX_DEFAULT_URL).rstrip("/") + params = {"q": query, "format": "json", "safesearch": 1} + data = _http_get_json(f"{base}/search", params, timeout=30) + out: List[Dict[str, Any]] = [] + seen = set() + for item in data.get("results", []): + url = item.get("url") + title = item.get("title") or "" + if not url or url in seen: + continue + if not _is_probably_html_url(url): + continue + seen.add(url) + out.append({"url": url, "title": title}) + if len(out) >= max_results: + break + return out + + +def fetch_pages(urls: List[str], *, max_pages: int = 8, min_text_len: int = 500) -> Dict[str, str]: + urls = urls[:max_pages] + out: Dict[str, str] = {} + for u in urls: + try: + html, hdrs = _http_get(u, timeout=20) + ctype = (hdrs.get("content-type") or "").lower() + if ctype and not ("text/html" in ctype or "application/xhtml+xml" in ctype or ctype.startswith("text/")): + continue + txt = _extract_text(html) + if len(txt) >= min_text_len: + out[u] = txt + except Exception: + continue + return out + + +# ----------------------------- +# Public API +# ----------------------------- + +def generate_prior_art_queries( + *, + ollama_host: str, + model: str, + notes: str, + kb: str, + assets: List[str], +) -> Dict[str, Any]: + assets_str = "\n".join(f"- {Path(a).name}" for a in assets) if assets else "(none)" + prompt = ( + SEARCH_QUERIES_PROMPT + .replace("{NOTES}", (notes or "").strip() or "(none)") + .replace("{KB}", (kb or "").strip() or "(empty)") + .replace("{ASSETS}", assets_str) + ) + raw = ollama_generate(ollama_host, model, prompt) + raw = sanitize_llm_text_simple(raw) + obj: Dict[str, Any] + try: + obj = json.loads(raw) + except Exception: + # try to salvage JSON object substring + m = re.search(r"\{[\s\S]*\}", raw) + if not m: + raise RuntimeError("Model did not return JSON for search queries.") + try: + obj = json.loads(m.group(0)) + except Exception: + raise RuntimeError("Failed to parse JSON for search queries.") + + q = obj.get("queries") if isinstance(obj, dict) else None + lang = obj.get("lang") if isinstance(obj, dict) else None + if not isinstance(q, list) or len(q) != 3: + raise RuntimeError("Search query generator must return exactly 3 queries.") + queries = [str(x).strip() for x in q if str(x).strip()] + if len(queries) != 3: + raise RuntimeError("Invalid queries after normalization.") + return {"queries": queries, "lang": (str(lang).strip() or "en")} + + +def prior_art_search( + *, + ollama_host: str, + model: str, + notes: str, + kb: str, + assets: List[str], + searx_url: Optional[str] = None, + embed_model: str = "bge-m3:latest", + status_cb: Optional[Callable[[str], None]] = None, + per_query_max: int = 6, + overall_max_urls: int = 16, + fetch_max_pages: int = 8, +) -> Dict[str, Any]: + """ + End-to-end prior-art search pipeline. + Returns a dict with keys: {"queries", "lang", "results": [{url,title,score,snippet}...]} + """ + def _status(msg: str): + if status_cb: + try: + status_cb(msg) + except Exception: + pass + + _status("Generating search queries…") + meta = generate_prior_art_queries( + ollama_host=ollama_host, model=model, notes=notes, kb=kb, assets=assets + ) + queries: List[str] = meta["queries"] + lang: str = meta.get("lang", "en") + + _status("Searching SearXNG…") + all_urls: List[str] = [] + seen = set() + base = (searx_url or SEARX_DEFAULT_URL).rstrip("/") + for q in queries: + try: + res = searx_search(base, q, max_results=per_query_max) + except Exception: + res = [] + for item in res: + u = item.get("url") + if not u or u in seen: + continue + seen.add(u) + all_urls.append(u) + if len(all_urls) >= overall_max_urls: + break + if len(all_urls) >= overall_max_urls: + break + + if not all_urls: + return {"queries": queries, "lang": lang, "results": []} + + _status("Fetching pages…") + pages = fetch_pages(all_urls, max_pages=fetch_max_pages, min_text_len=500) + if not pages: + return {"queries": queries, "lang": lang, "results": []} + + # Build doc list and short snippets + docs: List[Tuple[str, str]] = [] + for u in all_urls: + if u in pages: + docs.append((u, pages[u])) + if not docs: + return {"queries": queries, "lang": lang, "results": []} + + _status("Embedding and reranking…") + # Embed all 3 queries + q_embs: List[List[float]] = [] + for q in queries: + try: + q_embs.append(ollama_embed(ollama_host, embed_model, f"query: {q}")) + except Exception: + q_embs.append([]) + + # Embed documents (short passages) + DOC_SNIPPET_CHARS = 800 + d_embs: List[List[float]] = [] + for (_u, text) in docs: + snippet = text.replace("\n", " ") + if len(snippet) > DOC_SNIPPET_CHARS: + snippet = snippet[:DOC_SNIPPET_CHARS] + try: + d_embs.append(ollama_embed(ollama_host, embed_model, f"passage: {snippet}")) + except Exception: + d_embs.append([]) + + # Score each doc by max cosine across query embeddings + results: List[Tuple[str, float]] = [] # (url, score) + for i, (u, _t) in enumerate(docs): + best = 0.0 + for qv in q_embs: + if not qv or not d_embs[i]: + continue + c = cosine(qv, d_embs[i]) + if c > best: + best = c + # scale to 0..100 for user-facing + score = max(0.0, min(100.0, (best + 1.0) * 50.0)) + results.append((u, score)) + + # Sort and format output + results.sort(key=lambda x: x[1], reverse=True) + out: List[Dict[str, Any]] = [] + for (u, sc) in results: + txt = pages.get(u, "").strip() + snippet = txt[:800].replace("\n", " ") if txt else "" + out.append({"url": u, "title": "", "score": round(sc, 1), "snippet": snippet}) + + return {"queries": queries, "lang": lang, "results": out} +