#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Prior-art web search utilities for the Idea → Concept GUI. Design goals - Pure stdlib networking (urllib) to avoid new dependencies. - Works with a local SearXNG instance. - Uses Ollama for both query generation and embeddings. - Small, robust, and callable from the GUI with status callbacks. """ from __future__ import annotations from typing import List, Dict, Any, Optional, Tuple, Callable import json import re import time from pathlib import Path from urllib.parse import urlencode, urlparse import urllib.request import urllib.error try: from bs4 import BeautifulSoup # type: ignore except Exception: # pragma: no cover BeautifulSoup = None # type: ignore # Default SearXNG endpoint (no trailing slash) SEARX_DEFAULT_URL = "http://localhost:8888" # Prompt to generate search queries from sources SEARCH_QUERIES_PROMPT = ( """ You are a cross-domain prior-art scout. Your only task is to EMIT SEARCH QUERIES (no explanations) that can be sent to SearXNG to discover whether the user’s idea already exists. SOURCES - NOTES (freeform by user): {NOTES} - KNOWLEDGE BASE (excerpts): {KB} - ASSET NAMES (filenames the user selected): {ASSETS} RULES - Infer the single most likely idea from the sources above (silently). - Return STRICT JSON ONLY with exactly this schema: {"queries": ["", "", ""], "lang": ""} - Requirements: • Exactly 3 distinct queries; each ≤ 120 characters. • Use quotes for distinctive phrases when helpful. • Include synonyms or likely alternate names if useful. • Add up to 2 disambiguating negatives with a leading “-” only when needed. • Prefer engine-portable operators only: quotes "…", OR, -, site:, filetype:. Avoid engine-specific syntax. • Vary the angles across the 3 queries for broad recall: 1) direct existence/implementation check, 2) synonym/alternate-naming variant, 3) precision angle (compare/alternative wording OR a domain-targeted site:/filetype: if obvious). • Optional: include a simple year hint like 2019..2025 only if recency clearly matters. - Choose "lang" from the dominant language in NOTES/KB/ASSETS; default to "en" if unclear. - No prose, no markdown/code fences, no extra keys beyond {"queries","lang"}. """ ).strip() # ----------------------------- # Utils # ----------------------------- USER_AGENT = ( "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome Safari" ) def sanitize_llm_text_simple(s: str) -> str: try: s = re.sub(r".*?", "", s, flags=re.S | re.I) s = re.sub(r"^\s*```(?:\w+)?\s*", "", s) s = re.sub(r"\s*```\s*$", "", s) return s.strip() except Exception: return (s or "").strip() def _http_post_json(url: str, payload: Dict[str, Any], *, timeout: int = 600) -> Dict[str, Any]: data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json", "User-Agent": USER_AGENT}, method="POST", ) with urllib.request.urlopen(req, timeout=timeout) as resp: body = resp.read() return json.loads(body.decode("utf-8", "ignore")) def _http_get(url: str, *, timeout: int = 20, max_bytes: int = 1_500_000) -> Tuple[str, Dict[str, str]]: req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) with urllib.request.urlopen(req, timeout=timeout) as resp: ctype = (resp.headers.get("content-type") or "").lower() buf = bytearray() while True: chunk = resp.read(65536) if not chunk: break buf.extend(chunk) if len(buf) >= max_bytes: break try: text = buf.decode(resp.headers.get_content_charset() or "utf-8", errors="ignore") except Exception: text = buf.decode("utf-8", errors="ignore") return text, {"content-type": ctype} def _http_get_json(url: str, params: Dict[str, Any], *, timeout: int = 30) -> Dict[str, Any]: q = urlencode(params) text, _ = _http_get(f"{url}?{q}", timeout=timeout) try: return json.loads(text) except Exception: return {} def _is_probably_html_url(url: str) -> bool: try: path = urlparse(url).path.lower() if not path: return True if "." not in path: return True ext = path.rsplit(".", 1)[-1] return ext not in { "pdf","jpg","jpeg","png","gif","webp","svg","mp4","mp3","mov","avi", "zip","gz","7z","tar","rar","woff","woff2","ttf","otf" } except Exception: return True # Keywords that typically belong to navigation, banners, or cookie dialogs _NOISE_WORDS = { "home","contact","about","copyright","privacy","policy","cookies","cookie","consent", "login","log","sign","signup","signin","register","account","subscribe","newsletter", "advert","advertisement","ads","promo","banner","menu","navigation","nav","footer", "header","share","social","terms","conditions","accessibility","language","shop", "search","skip","main","content" } _NOISE_ATTR_RE = re.compile(r"(cookie|consent|gdpr|banner|popup|modal|dialog|newsletter|subscribe|advert|promo|signin|signup|login|toolbar|share|social|nav|menu|footer|header)", re.I) def _clean_lines(text: str) -> str: lines = [ln.strip() for ln in text.splitlines()] out = [] seen: set[str] = set() for ln in lines: if not ln: if out and out[-1]: out.append("") continue lower = ln.lower() # Drop obvious boilerplate / cookie notices / menu crumbs if len(ln) <= 140: if any(k in lower for k in ("cookie", "consent", "newsletter", "advert", "privacy policy", "terms", "skip to main", "enable javascript", "accept all", "manage preferences")): continue tokens = re.findall(r"[a-zA-Z]+", lower) if tokens and all(t in _NOISE_WORDS for t in tokens) and len(tokens) <= 8: continue if lower in seen: continue seen.add(lower) out.append(ln) # Collapse multiple blank lines compact: list[str] = [] for ln in out: if ln == "" and (not compact or compact[-1] == ""): continue compact.append(ln) return "\n".join(compact) def _extract_text(html: str, *, max_len: int = 120_000) -> str: if not html: return "" if BeautifulSoup is None: # crude fallback if bs4 missing txt = re.sub(r"<\s*(script|style)[^>]*>.*?<\s*/\s*\1\s*>", " ", html, flags=re.S|re.I) txt = re.sub(r"<[^>]+>", " ", txt) txt = re.sub(r"\s+", " ", txt) return _clean_lines(txt.strip())[:max_len] try: try: soup = BeautifulSoup(html, "lxml") except Exception: soup = BeautifulSoup(html, "html.parser") # Strip obvious boilerplate containers first for tag in soup.select("script,style,noscript,template"): tag.decompose() for tag_name in ("header", "footer", "nav", "aside", "form", "iframe", "svg", "canvas", "input", "select", "option", "button"): for t in soup.find_all(tag_name): t.decompose() # Remove elements whose id/class/role clearly mark them as noise for el in list(soup.find_all(True)): attrs = " ".join([ el.get("id") or "", " ".join(el.get("class") or []), el.get("role") or "", ]) if _NOISE_ATTR_RE.search(attrs or ""): try: el.decompose() except Exception: pass def _norm(node): txt = node.get_text("\n", strip=True) txt = re.sub(r"[ \t]+", " ", txt) txt = re.sub(r"\n{3,}", "\n\n", txt) return txt # Score candidate blocks to pick main content best_text = "" best_score = 0.0 for node in soup.find_all(["article", "main", "section", "div", "body"]): raw = _norm(node) if not raw or len(raw) < 80: continue link_count = len(node.find_all("a")) link_density = link_count / max(1.0, len(raw) / 80.0) penalty = min(0.9, link_density) bonus = 1.0 if node.name == "article": bonus += 0.35 elif node.name == "main": bonus += 0.25 elif node.name == "section": bonus += 0.1 score = len(raw) * bonus * (1.0 - penalty) if score > best_score: best_score = score best_text = raw if not best_text: # fallback to whole body/text target = soup.body or soup best_text = _norm(target) cleaned = _clean_lines(best_text) if not cleaned.strip(): return "" if len(cleaned) > max_len: return cleaned[:max_len] return cleaned except Exception: return "" # ----------------------------- # Ollama helpers # ----------------------------- def ollama_generate(host: str, model: str, prompt: str, *, timeout: int = 600) -> str: url = f"{host.rstrip('/')}/api/generate" payload = {"model": model, "prompt": prompt, "stream": False} try: obj = _http_post_json(url, payload, timeout=timeout) return (obj.get("response") or "").strip() except urllib.error.HTTPError as e: raise RuntimeError(f"Ollama HTTP error {e.code}: {e.read().decode('utf-8','ignore')}") except Exception as e: raise RuntimeError(f"Ollama request failed: {e}") def ollama_embed(host: str, model: str, text: str, *, timeout: int = 120) -> List[float]: url = f"{host.rstrip('/')}/api/embeddings" payload = {"model": model, "prompt": text} try: obj = _http_post_json(url, payload, timeout=timeout) except urllib.error.HTTPError as e: raise RuntimeError(f"Ollama embeddings HTTP error {e.code}: {e.read().decode('utf-8','ignore')}") except Exception as e: raise RuntimeError(f"Ollama embeddings request failed: {e}") # Normalize common shapes if isinstance(obj, dict): if "error" in obj: raise RuntimeError(f"Ollama embeddings error: {obj.get('error')}") if "embedding" in obj and isinstance(obj["embedding"], list): return [float(x) for x in obj["embedding"] if isinstance(x, (int, float))] if "data" in obj and isinstance(obj["data"], list) and obj["data"]: em = obj["data"][0].get("embedding", []) if isinstance(em, list): return [float(x) for x in em if isinstance(x, (int, float))] if "embeddings" in obj and isinstance(obj["embeddings"], list) and obj["embeddings"]: em0 = obj["embeddings"][0] if isinstance(em0, dict): em0 = em0.get("embedding", []) if isinstance(em0, list): return [float(x) for x in em0 if isinstance(x, (int, float))] return [] def cosine(a: List[float], b: List[float]) -> float: n = min(len(a), len(b)) if n == 0: return 0.0 dot = na = nb = 0.0 for i in range(n): x = a[i]; y = b[i] dot += x * y na += x * x nb += y * y if na <= 0.0 or nb <= 0.0: return 0.0 return dot / ((na ** 0.5) * (nb ** 0.5)) # ----------------------------- # SearXNG search + fetch # ----------------------------- def searx_search(base_url: str, query: str, *, max_results: int = 6) -> List[Dict[str, Any]]: base = (base_url or SEARX_DEFAULT_URL).rstrip("/") params = {"q": query, "format": "json", "safesearch": 1} data = _http_get_json(f"{base}/search", params, timeout=30) out: List[Dict[str, Any]] = [] seen = set() for item in data.get("results", []): url = item.get("url") title = item.get("title") or "" if not url or url in seen: continue if not _is_probably_html_url(url): continue seen.add(url) out.append({"url": url, "title": title}) if len(out) >= max_results: break return out def fetch_pages(urls: List[str], *, max_pages: int = 8, min_text_len: int = 500) -> Dict[str, str]: urls = urls[:max_pages] out: Dict[str, str] = {} for u in urls: try: html, hdrs = _http_get(u, timeout=20) ctype = (hdrs.get("content-type") or "").lower() if ctype and not ("text/html" in ctype or "application/xhtml+xml" in ctype or ctype.startswith("text/")): continue txt = _extract_text(html) if len(txt) >= min_text_len: out[u] = txt except Exception: continue return out # ----------------------------- # Public API # ----------------------------- def generate_prior_art_queries( *, ollama_host: str, model: str, notes: str, kb: str, assets: List[str], ) -> Dict[str, Any]: assets_str = "\n".join(f"- {Path(a).name}" for a in assets) if assets else "(none)" prompt = ( SEARCH_QUERIES_PROMPT .replace("{NOTES}", (notes or "").strip() or "(none)") .replace("{KB}", (kb or "").strip() or "(empty)") .replace("{ASSETS}", assets_str) ) raw = ollama_generate(ollama_host, model, prompt) raw = sanitize_llm_text_simple(raw) obj: Dict[str, Any] try: obj = json.loads(raw) except Exception: # try to salvage JSON object substring m = re.search(r"\{[\s\S]*\}", raw) if not m: raise RuntimeError("Model did not return JSON for search queries.") try: obj = json.loads(m.group(0)) except Exception: raise RuntimeError("Failed to parse JSON for search queries.") q = obj.get("queries") if isinstance(obj, dict) else None lang = obj.get("lang") if isinstance(obj, dict) else None if not isinstance(q, list) or len(q) != 3: raise RuntimeError("Search query generator must return exactly 3 queries.") queries = [str(x).strip() for x in q if str(x).strip()] if len(queries) != 3: raise RuntimeError("Invalid queries after normalization.") return {"queries": queries, "lang": (str(lang).strip() or "en")} def prior_art_search( *, ollama_host: str, model: str, notes: str, kb: str, assets: List[str], searx_url: Optional[str] = None, embed_model: str = "bge-m3:latest", status_cb: Optional[Callable[[str], None]] = None, per_query_max: int = 6, overall_max_urls: int = 16, fetch_max_pages: int = 8, ) -> Dict[str, Any]: """ End-to-end prior-art search pipeline. Returns a dict with keys: {"queries", "lang", "results": [{url,title,score,snippet}...]} """ def _status(msg: str): if status_cb: try: status_cb(msg) except Exception: pass _status("Generating search queries…") meta = generate_prior_art_queries( ollama_host=ollama_host, model=model, notes=notes, kb=kb, assets=assets ) queries: List[str] = meta["queries"] lang: str = meta.get("lang", "en") _status("Searching SearXNG…") all_urls: List[str] = [] seen = set() base = (searx_url or SEARX_DEFAULT_URL).rstrip("/") for q in queries: try: res = searx_search(base, q, max_results=per_query_max) except Exception: res = [] for item in res: u = item.get("url") if not u or u in seen: continue seen.add(u) all_urls.append(u) if len(all_urls) >= overall_max_urls: break if len(all_urls) >= overall_max_urls: break if not all_urls: return {"queries": queries, "lang": lang, "results": []} _status("Fetching pages…") pages = fetch_pages(all_urls, max_pages=fetch_max_pages, min_text_len=500) if not pages: return {"queries": queries, "lang": lang, "results": []} # Build doc list and short snippets docs: List[Tuple[str, str]] = [] for u in all_urls: if u in pages: docs.append((u, pages[u])) if not docs: return {"queries": queries, "lang": lang, "results": []} _status("Embedding and reranking…") # Embed all 3 queries q_embs: List[List[float]] = [] for q in queries: try: q_embs.append(ollama_embed(ollama_host, embed_model, f"query: {q}")) except Exception: q_embs.append([]) # Embed documents (short passages) DOC_SNIPPET_CHARS = 800 d_embs: List[List[float]] = [] for (_u, text) in docs: snippet = text.replace("\n", " ") if len(snippet) > DOC_SNIPPET_CHARS: snippet = snippet[:DOC_SNIPPET_CHARS] try: d_embs.append(ollama_embed(ollama_host, embed_model, f"passage: {snippet}")) except Exception: d_embs.append([]) # Score each doc by max cosine across query embeddings results: List[Tuple[str, float]] = [] # (url, score) for i, (u, _t) in enumerate(docs): best = 0.0 for qv in q_embs: if not qv or not d_embs[i]: continue c = cosine(qv, d_embs[i]) if c > best: best = c # scale to 0..100 for user-facing score = max(0.0, min(100.0, (best + 1.0) * 50.0)) results.append((u, score)) # Sort and format output results.sort(key=lambda x: x[1], reverse=True) out: List[Dict[str, Any]] = [] for (u, sc) in results: txt = pages.get(u, "").strip() snippet = txt[:800].replace("\n", " ") if txt else "" out.append({"url": u, "title": "", "score": round(sc, 1), "snippet": snippet}) return {"queries": queries, "lang": lang, "results": out}