#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Prior-art web search utilities for the Idea → Concept GUI. Design goals - Pure stdlib networking (urllib) to avoid new dependencies. - Works with a local SearXNG instance. - Uses Ollama for both query generation and embeddings. - Small, robust, and callable from the GUI with status callbacks. """ from __future__ import annotations from typing import List, Dict, Any, Optional, Tuple, Callable import json import re import time from pathlib import Path from urllib.parse import urlencode, urlparse import urllib.request import urllib.error try: from bs4 import BeautifulSoup # type: ignore except Exception: # pragma: no cover BeautifulSoup = None # type: ignore # Default SearXNG endpoint (no trailing slash) SEARX_DEFAULT_URL = "http://localhost:8888" # Prompt to generate search queries from sources SEARCH_QUERIES_PROMPT = ( """ You are a cross-domain prior-art scout. Your only task is to EMIT SEARCH QUERIES (no explanations) that can be sent to SearXNG to discover whether the user’s idea already exists. SOURCES - NOTES (freeform by user): {NOTES} - KNOWLEDGE BASE (excerpts): {KB} - ASSET NAMES (filenames the user selected): {ASSETS} RULES - Infer the single most likely idea from the sources above (silently). - Return STRICT JSON ONLY with exactly this schema: {"queries": ["", "", ""], "lang": ""} - Requirements: • Exactly 3 distinct queries; each ≤ 120 characters. • Use quotes for distinctive phrases when helpful. • Include synonyms or likely alternate names if useful. • Add up to 2 disambiguating negatives with a leading “-” only when needed. • Prefer engine-portable operators only: quotes "…", OR, -, site:, filetype:. Avoid engine-specific syntax. • Vary the angles across the 3 queries for broad recall: 1) direct existence/implementation check, 2) synonym/alternate-naming variant, 3) precision angle (compare/alternative wording OR a domain-targeted site:/filetype: if obvious). • Optional: include a simple year hint like 2019..2025 only if recency clearly matters. - Choose "lang" from the dominant language in NOTES/KB/ASSETS; default to "en" if unclear. - No prose, no markdown/code fences, no extra keys beyond {"queries","lang"}. """ ).strip() # ----------------------------- # Utils # ----------------------------- USER_AGENT = ( "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome Safari" ) def sanitize_llm_text_simple(s: str) -> str: try: s = re.sub(r".*?", "", s, flags=re.S | re.I) s = re.sub(r"^\s*```(?:\w+)?\s*", "", s) s = re.sub(r"\s*```\s*$", "", s) return s.strip() except Exception: return (s or "").strip() def _http_post_json(url: str, payload: Dict[str, Any], *, timeout: int = 600) -> Dict[str, Any]: data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json", "User-Agent": USER_AGENT}, method="POST", ) with urllib.request.urlopen(req, timeout=timeout) as resp: body = resp.read() return json.loads(body.decode("utf-8", "ignore")) def _http_get(url: str, *, timeout: int = 20, max_bytes: int = 1_500_000) -> Tuple[str, Dict[str, str]]: req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) with urllib.request.urlopen(req, timeout=timeout) as resp: ctype = (resp.headers.get("content-type") or "").lower() buf = bytearray() while True: chunk = resp.read(65536) if not chunk: break buf.extend(chunk) if len(buf) >= max_bytes: break try: text = buf.decode(resp.headers.get_content_charset() or "utf-8", errors="ignore") except Exception: text = buf.decode("utf-8", errors="ignore") return text, {"content-type": ctype} def _http_get_json(url: str, params: Dict[str, Any], *, timeout: int = 30) -> Dict[str, Any]: q = urlencode(params) text, _ = _http_get(f"{url}?{q}", timeout=timeout) try: return json.loads(text) except Exception: return {} def _is_probably_html_url(url: str) -> bool: try: path = urlparse(url).path.lower() if not path: return True if "." not in path: return True ext = path.rsplit(".", 1)[-1] return ext not in { "pdf","jpg","jpeg","png","gif","webp","svg","mp4","mp3","mov","avi", "zip","gz","7z","tar","rar","woff","woff2","ttf","otf" } except Exception: return True def _extract_text(html: str, *, max_len: int = 120_000) -> str: if not html: return "" if BeautifulSoup is None: # crude fallback if bs4 missing txt = re.sub(r"<\s*(script|style)[^>]*>.*?<\s*/\s*\1\s*>", " ", html, flags=re.S|re.I) txt = re.sub(r"<[^>]+>", " ", txt) txt = re.sub(r"\s+", " ", txt) return txt.strip()[:max_len] try: try: soup = BeautifulSoup(html, "lxml") except Exception: soup = BeautifulSoup(html, "html.parser") for tag in soup.select("script,style,noscript"): tag.decompose() text = soup.get_text("\n", strip=True) text = re.sub(r"[ \t]+", " ", text) text = re.sub(r"\n{3,}", "\n\n", text) return text[:max_len] if len(text) > max_len else text except Exception: return "" # ----------------------------- # Ollama helpers # ----------------------------- def ollama_generate(host: str, model: str, prompt: str, *, timeout: int = 600) -> str: url = f"{host.rstrip('/')}/api/generate" payload = {"model": model, "prompt": prompt, "stream": False} try: obj = _http_post_json(url, payload, timeout=timeout) return (obj.get("response") or "").strip() except urllib.error.HTTPError as e: raise RuntimeError(f"Ollama HTTP error {e.code}: {e.read().decode('utf-8','ignore')}") except Exception as e: raise RuntimeError(f"Ollama request failed: {e}") def ollama_embed(host: str, model: str, text: str, *, timeout: int = 120) -> List[float]: url = f"{host.rstrip('/')}/api/embeddings" payload = {"model": model, "prompt": text} try: obj = _http_post_json(url, payload, timeout=timeout) except urllib.error.HTTPError as e: raise RuntimeError(f"Ollama embeddings HTTP error {e.code}: {e.read().decode('utf-8','ignore')}") except Exception as e: raise RuntimeError(f"Ollama embeddings request failed: {e}") # Normalize common shapes if isinstance(obj, dict): if "error" in obj: raise RuntimeError(f"Ollama embeddings error: {obj.get('error')}") if "embedding" in obj and isinstance(obj["embedding"], list): return [float(x) for x in obj["embedding"] if isinstance(x, (int, float))] if "data" in obj and isinstance(obj["data"], list) and obj["data"]: em = obj["data"][0].get("embedding", []) if isinstance(em, list): return [float(x) for x in em if isinstance(x, (int, float))] if "embeddings" in obj and isinstance(obj["embeddings"], list) and obj["embeddings"]: em0 = obj["embeddings"][0] if isinstance(em0, dict): em0 = em0.get("embedding", []) if isinstance(em0, list): return [float(x) for x in em0 if isinstance(x, (int, float))] return [] def cosine(a: List[float], b: List[float]) -> float: n = min(len(a), len(b)) if n == 0: return 0.0 dot = na = nb = 0.0 for i in range(n): x = a[i]; y = b[i] dot += x * y na += x * x nb += y * y if na <= 0.0 or nb <= 0.0: return 0.0 return dot / ((na ** 0.5) * (nb ** 0.5)) # ----------------------------- # SearXNG search + fetch # ----------------------------- def searx_search(base_url: str, query: str, *, max_results: int = 6) -> List[Dict[str, Any]]: base = (base_url or SEARX_DEFAULT_URL).rstrip("/") params = {"q": query, "format": "json", "safesearch": 1} data = _http_get_json(f"{base}/search", params, timeout=30) out: List[Dict[str, Any]] = [] seen = set() for item in data.get("results", []): url = item.get("url") title = item.get("title") or "" if not url or url in seen: continue if not _is_probably_html_url(url): continue seen.add(url) out.append({"url": url, "title": title}) if len(out) >= max_results: break return out def fetch_pages(urls: List[str], *, max_pages: int = 8, min_text_len: int = 500) -> Dict[str, str]: urls = urls[:max_pages] out: Dict[str, str] = {} for u in urls: try: html, hdrs = _http_get(u, timeout=20) ctype = (hdrs.get("content-type") or "").lower() if ctype and not ("text/html" in ctype or "application/xhtml+xml" in ctype or ctype.startswith("text/")): continue txt = _extract_text(html) if len(txt) >= min_text_len: out[u] = txt except Exception: continue return out # ----------------------------- # Public API # ----------------------------- def generate_prior_art_queries( *, ollama_host: str, model: str, notes: str, kb: str, assets: List[str], ) -> Dict[str, Any]: assets_str = "\n".join(f"- {Path(a).name}" for a in assets) if assets else "(none)" prompt = ( SEARCH_QUERIES_PROMPT .replace("{NOTES}", (notes or "").strip() or "(none)") .replace("{KB}", (kb or "").strip() or "(empty)") .replace("{ASSETS}", assets_str) ) raw = ollama_generate(ollama_host, model, prompt) raw = sanitize_llm_text_simple(raw) obj: Dict[str, Any] try: obj = json.loads(raw) except Exception: # try to salvage JSON object substring m = re.search(r"\{[\s\S]*\}", raw) if not m: raise RuntimeError("Model did not return JSON for search queries.") try: obj = json.loads(m.group(0)) except Exception: raise RuntimeError("Failed to parse JSON for search queries.") q = obj.get("queries") if isinstance(obj, dict) else None lang = obj.get("lang") if isinstance(obj, dict) else None if not isinstance(q, list) or len(q) != 3: raise RuntimeError("Search query generator must return exactly 3 queries.") queries = [str(x).strip() for x in q if str(x).strip()] if len(queries) != 3: raise RuntimeError("Invalid queries after normalization.") return {"queries": queries, "lang": (str(lang).strip() or "en")} def prior_art_search( *, ollama_host: str, model: str, notes: str, kb: str, assets: List[str], searx_url: Optional[str] = None, embed_model: str = "bge-m3:latest", status_cb: Optional[Callable[[str], None]] = None, per_query_max: int = 6, overall_max_urls: int = 16, fetch_max_pages: int = 8, ) -> Dict[str, Any]: """ End-to-end prior-art search pipeline. Returns a dict with keys: {"queries", "lang", "results": [{url,title,score,snippet}...]} """ def _status(msg: str): if status_cb: try: status_cb(msg) except Exception: pass _status("Generating search queries…") meta = generate_prior_art_queries( ollama_host=ollama_host, model=model, notes=notes, kb=kb, assets=assets ) queries: List[str] = meta["queries"] lang: str = meta.get("lang", "en") _status("Searching SearXNG…") all_urls: List[str] = [] seen = set() base = (searx_url or SEARX_DEFAULT_URL).rstrip("/") for q in queries: try: res = searx_search(base, q, max_results=per_query_max) except Exception: res = [] for item in res: u = item.get("url") if not u or u in seen: continue seen.add(u) all_urls.append(u) if len(all_urls) >= overall_max_urls: break if len(all_urls) >= overall_max_urls: break if not all_urls: return {"queries": queries, "lang": lang, "results": []} _status("Fetching pages…") pages = fetch_pages(all_urls, max_pages=fetch_max_pages, min_text_len=500) if not pages: return {"queries": queries, "lang": lang, "results": []} # Build doc list and short snippets docs: List[Tuple[str, str]] = [] for u in all_urls: if u in pages: docs.append((u, pages[u])) if not docs: return {"queries": queries, "lang": lang, "results": []} _status("Embedding and reranking…") # Embed all 3 queries q_embs: List[List[float]] = [] for q in queries: try: q_embs.append(ollama_embed(ollama_host, embed_model, f"query: {q}")) except Exception: q_embs.append([]) # Embed documents (short passages) DOC_SNIPPET_CHARS = 800 d_embs: List[List[float]] = [] for (_u, text) in docs: snippet = text.replace("\n", " ") if len(snippet) > DOC_SNIPPET_CHARS: snippet = snippet[:DOC_SNIPPET_CHARS] try: d_embs.append(ollama_embed(ollama_host, embed_model, f"passage: {snippet}")) except Exception: d_embs.append([]) # Score each doc by max cosine across query embeddings results: List[Tuple[str, float]] = [] # (url, score) for i, (u, _t) in enumerate(docs): best = 0.0 for qv in q_embs: if not qv or not d_embs[i]: continue c = cosine(qv, d_embs[i]) if c > best: best = c # scale to 0..100 for user-facing score = max(0.0, min(100.0, (best + 1.0) * 50.0)) results.append((u, score)) # Sort and format output results.sort(key=lambda x: x[1], reverse=True) out: List[Dict[str, Any]] = [] for (u, sc) in results: txt = pages.get(u, "").strip() snippet = txt[:800].replace("\n", " ") if txt else "" out.append({"url": u, "title": "", "score": round(sc, 1), "snippet": snippet}) return {"queries": queries, "lang": lang, "results": out}