Files
concept-maker/websearch.py

560 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Prior-art web search utilities for the Idea → Concept GUI.
Design goals
- Pure stdlib networking (urllib) to avoid new dependencies.
- Works with a local SearXNG instance.
- Uses Ollama for both query generation and embeddings.
- Small, robust, and callable from the GUI with status callbacks.
"""
from __future__ import annotations
from typing import List, Dict, Any, Optional, Tuple, Callable
import json
import re
import time
from pathlib import Path
from urllib.parse import urlencode, urlparse
import urllib.request
import urllib.error
try:
from bs4 import BeautifulSoup # type: ignore
except Exception: # pragma: no cover
BeautifulSoup = None # type: ignore
# Default SearXNG endpoint (no trailing slash)
SEARX_DEFAULT_URL = "http://localhost:8888"
# Prompt to generate search queries from sources
SEARCH_QUERIES_PROMPT = (
"""
You are a cross-domain prior-art scout. Your only task is to EMIT SEARCH QUERIES (no explanations) that can be sent to SearXNG to discover whether the users idea already exists.
SOURCES
- NOTES (freeform by user):
{NOTES}
- KNOWLEDGE BASE (excerpts):
{KB}
- ASSET NAMES (filenames the user selected):
{ASSETS}
RULES
- Infer the single most likely idea from the sources above (silently).
- Return STRICT JSON ONLY with exactly this schema:
{"queries": ["<q1>", "<q2>", "<q3>"], "lang": "<iso-639-1>"}
- Requirements:
• Exactly 3 distinct queries; each ≤ 120 characters.
• Use quotes for distinctive phrases when helpful.
• Include synonyms or likely alternate names if useful.
• Add up to 2 disambiguating negatives with a leading “-” only when needed.
• Prefer engine-portable operators only: quotes "", OR, -, site:, filetype:. Avoid engine-specific syntax.
• Vary the angles across the 3 queries for broad recall:
1) direct existence/implementation check,
2) synonym/alternate-naming variant,
3) precision angle (compare/alternative wording OR a domain-targeted site:/filetype: if obvious).
• Optional: include a simple year hint like 2019..2025 only if recency clearly matters.
- Choose "lang" from the dominant language in NOTES/KB/ASSETS; default to "en" if unclear.
- No prose, no markdown/code fences, no extra keys beyond {"queries","lang"}.
"""
).strip()
# -----------------------------
# Utils
# -----------------------------
USER_AGENT = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome Safari"
)
def sanitize_llm_text_simple(s: str) -> str:
try:
s = re.sub(r"<think>.*?</think>", "", s, flags=re.S | re.I)
s = re.sub(r"^\s*```(?:\w+)?\s*", "", s)
s = re.sub(r"\s*```\s*$", "", s)
return s.strip()
except Exception:
return (s or "").strip()
def _http_post_json(url: str, payload: Dict[str, Any], *, timeout: int = 600) -> Dict[str, Any]:
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
url,
data=data,
headers={"Content-Type": "application/json", "User-Agent": USER_AGENT},
method="POST",
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read()
return json.loads(body.decode("utf-8", "ignore"))
def _http_get(url: str, *, timeout: int = 20, max_bytes: int = 1_500_000) -> Tuple[str, Dict[str, str]]:
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
with urllib.request.urlopen(req, timeout=timeout) as resp:
ctype = (resp.headers.get("content-type") or "").lower()
buf = bytearray()
while True:
chunk = resp.read(65536)
if not chunk:
break
buf.extend(chunk)
if len(buf) >= max_bytes:
break
try:
text = buf.decode(resp.headers.get_content_charset() or "utf-8", errors="ignore")
except Exception:
text = buf.decode("utf-8", errors="ignore")
return text, {"content-type": ctype}
def _http_get_json(url: str, params: Dict[str, Any], *, timeout: int = 30) -> Dict[str, Any]:
q = urlencode(params)
text, _ = _http_get(f"{url}?{q}", timeout=timeout)
try:
return json.loads(text)
except Exception:
return {}
def _is_probably_html_url(url: str) -> bool:
try:
path = urlparse(url).path.lower()
if not path:
return True
if "." not in path:
return True
ext = path.rsplit(".", 1)[-1]
return ext not in {
"pdf","jpg","jpeg","png","gif","webp","svg","mp4","mp3","mov","avi",
"zip","gz","7z","tar","rar","woff","woff2","ttf","otf"
}
except Exception:
return True
# Keywords that typically belong to navigation, banners, or cookie dialogs
_NOISE_WORDS = {
"home","contact","about","copyright","privacy","policy","cookies","cookie","consent",
"login","log","sign","signup","signin","register","account","subscribe","newsletter",
"advert","advertisement","ads","promo","banner","menu","navigation","nav","footer",
"header","share","social","terms","conditions","accessibility","language","shop",
"search","skip","main","content"
}
_NOISE_ATTR_RE = re.compile(r"(cookie|consent|gdpr|banner|popup|modal|dialog|newsletter|subscribe|advert|promo|signin|signup|login|toolbar|share|social|nav|menu|footer|header)", re.I)
_CONTENT_HINT_RE = re.compile(r"(article|content|main|post|story|entry|body|text|read|news|blog|page)", re.I)
_NEGATIVE_HINT_RE = re.compile(r"(breadcrumb|nav|menu|sidebar|widget|footer|header|related|share|social|tagcloud|comment|reply|advert|sponsor|promo|contact|newsletter)", re.I)
_COOKIE_HINT_RE = re.compile(r"(cookie|consent|gdpr|cmp|privacy[- ]?settings|tracking|opt[- ]?out|accept all)", re.I)
def _clean_lines(text: str) -> str:
lines = [ln.strip() for ln in text.splitlines()]
out = []
seen: set[str] = set()
for ln in lines:
if not ln:
if out and out[-1]:
out.append("")
continue
lower = ln.lower()
# Drop obvious boilerplate / cookie notices / menu crumbs
if len(ln) <= 140:
if any(k in lower for k in ("cookie", "consent", "newsletter", "advert", "privacy policy", "terms", "skip to main", "enable javascript", "accept all", "manage preferences")):
continue
tokens = re.findall(r"[a-zA-Z]+", lower)
if tokens and all(t in _NOISE_WORDS for t in tokens) and len(tokens) <= 8:
continue
if lower in seen:
continue
seen.add(lower)
out.append(ln)
# Collapse multiple blank lines
compact: list[str] = []
for ln in out:
if ln == "" and (not compact or compact[-1] == ""):
continue
compact.append(ln)
return "\n".join(compact)
def _extract_text(html: str, *, max_len: int = 120_000) -> str:
if not html:
return ""
if BeautifulSoup is None:
# crude fallback if bs4 missing
txt = re.sub(r"<\s*(script|style)[^>]*>.*?<\s*/\s*\1\s*>", " ", html, flags=re.S|re.I)
txt = re.sub(r"<[^>]+>", " ", txt)
txt = re.sub(r"\s+", " ", txt)
return _clean_lines(txt.strip())[:max_len]
try:
try:
soup = BeautifulSoup(html, "lxml")
except Exception:
soup = BeautifulSoup(html, "html.parser")
# Strip obvious boilerplate containers first
for tag in soup.select("script,style,noscript,template"):
tag.decompose()
# Remove cookie/consent overlays by attributes or short text snippets (conservative)
for el in list(soup.find_all(True)):
if el.name in {"html", "body"}:
continue
attr_str = " ".join([
el.get("id") or "",
" ".join(el.get("class") or []),
el.get("role") or "",
el.get("aria-label") or "",
]).lower()
text_preview = (el.get_text(" ", strip=True)[:220] or "").lower()
if (_COOKIE_HINT_RE.search(attr_str) or _COOKIE_HINT_RE.search(text_preview)):
# only remove if looks like a small overlay/dialog, not the main body
full_txt = " ".join(el.get_text(" ", strip=True).split())
if len(text_preview) <= 260 or len(full_txt) <= 800:
try:
el.decompose()
except Exception:
pass
def _norm(node):
txt = node.get_text("\n", strip=True)
txt = re.sub(r"[ \t]+", " ", txt)
txt = re.sub(r"\n{3,}", "\n\n", txt)
return txt
def _score_node(node, raw_text: str, attr_str: str) -> float:
length = len(raw_text)
if length < 40:
return 0.0
link_count = len(node.find_all("a"))
link_density = link_count / max(1.0, length / 80.0)
bonus = 0.0
if node.name == "article":
bonus += 0.6 * length
elif node.name == "main":
bonus += 0.4 * length
elif node.name == "section":
bonus += 0.2 * length
if _CONTENT_HINT_RE.search(attr_str):
bonus += 0.35 * length
if _NEGATIVE_HINT_RE.search(attr_str):
bonus -= 0.2 * length
penalty = min(0.9, link_density * 0.6)
return (length + bonus) * (1.0 - penalty)
# Score candidate blocks to pick main content without over-deleting nodes
best_text = ""
best_score = 0.0
for node in soup.find_all(["article", "main", "section", "div", "body"]):
raw = _norm(node)
if not raw:
continue
attr_str = " ".join([
node.get("id") or "",
" ".join(node.get("class") or []),
node.get("role") or "",
node.get("aria-label") or "",
]).lower()
score = _score_node(node, raw, attr_str)
if score > best_score:
best_score = score
best_text = raw
if not best_text:
# fallback to whole body/text
target = soup.body or soup
best_text = _norm(target)
cleaned = _clean_lines(best_text)
# If too short, fall back to broader body text instead of dropping the page
if len(cleaned) < 200:
broader = _clean_lines(_norm(soup.body or soup))
if len(broader) > len(cleaned):
cleaned = broader
# Absolute fallback to regex-based stripping if soup path failed
if len(cleaned.strip()) < 120:
txt = re.sub(r"<\s*(script|style)[^>]*>.*?<\s*/\s*\1\s*>", " ", html, flags=re.S|re.I)
txt = re.sub(r"<[^>]+>", " ", txt)
txt = re.sub(r"\s+", " ", txt)
cleaned = txt.strip()
if not cleaned.strip():
return ""
if len(cleaned) > max_len:
return cleaned[:max_len]
return cleaned
except Exception:
return ""
# -----------------------------
# Ollama helpers
# -----------------------------
def ollama_generate(host: str, model: str, prompt: str, *, timeout: int = 600) -> str:
url = f"{host.rstrip('/')}/api/generate"
payload = {"model": model, "prompt": prompt, "stream": False}
try:
obj = _http_post_json(url, payload, timeout=timeout)
return (obj.get("response") or "").strip()
except urllib.error.HTTPError as e:
raise RuntimeError(f"Ollama HTTP error {e.code}: {e.read().decode('utf-8','ignore')}")
except Exception as e:
raise RuntimeError(f"Ollama request failed: {e}")
def ollama_embed(host: str, model: str, text: str, *, timeout: int = 120) -> List[float]:
url = f"{host.rstrip('/')}/api/embeddings"
payload = {"model": model, "prompt": text}
try:
obj = _http_post_json(url, payload, timeout=timeout)
except urllib.error.HTTPError as e:
raise RuntimeError(f"Ollama embeddings HTTP error {e.code}: {e.read().decode('utf-8','ignore')}")
except Exception as e:
raise RuntimeError(f"Ollama embeddings request failed: {e}")
# Normalize common shapes
if isinstance(obj, dict):
if "error" in obj:
raise RuntimeError(f"Ollama embeddings error: {obj.get('error')}")
if "embedding" in obj and isinstance(obj["embedding"], list):
return [float(x) for x in obj["embedding"] if isinstance(x, (int, float))]
if "data" in obj and isinstance(obj["data"], list) and obj["data"]:
em = obj["data"][0].get("embedding", [])
if isinstance(em, list):
return [float(x) for x in em if isinstance(x, (int, float))]
if "embeddings" in obj and isinstance(obj["embeddings"], list) and obj["embeddings"]:
em0 = obj["embeddings"][0]
if isinstance(em0, dict):
em0 = em0.get("embedding", [])
if isinstance(em0, list):
return [float(x) for x in em0 if isinstance(x, (int, float))]
return []
def cosine(a: List[float], b: List[float]) -> float:
n = min(len(a), len(b))
if n == 0:
return 0.0
dot = na = nb = 0.0
for i in range(n):
x = a[i]; y = b[i]
dot += x * y
na += x * x
nb += y * y
if na <= 0.0 or nb <= 0.0:
return 0.0
return dot / ((na ** 0.5) * (nb ** 0.5))
# -----------------------------
# SearXNG search + fetch
# -----------------------------
def searx_search(base_url: str, query: str, *, max_results: int = 6) -> List[Dict[str, Any]]:
base = (base_url or SEARX_DEFAULT_URL).rstrip("/")
params = {"q": query, "format": "json", "safesearch": 1}
data = _http_get_json(f"{base}/search", params, timeout=30)
out: List[Dict[str, Any]] = []
seen = set()
for item in data.get("results", []):
url = item.get("url")
title = item.get("title") or ""
if not url or url in seen:
continue
if not _is_probably_html_url(url):
continue
seen.add(url)
out.append({"url": url, "title": title})
if len(out) >= max_results:
break
return out
def fetch_pages(urls: List[str], *, max_pages: int = 8, min_text_len: int = 200) -> Dict[str, str]:
urls = urls[:max_pages]
out: Dict[str, str] = {}
for u in urls:
try:
html, hdrs = _http_get(u, timeout=20)
ctype = (hdrs.get("content-type") or "").lower()
if ctype and not ("text/html" in ctype or "application/xhtml+xml" in ctype or ctype.startswith("text/")):
continue
txt = _extract_text(html)
if len(txt) >= min_text_len:
out[u] = txt
except Exception:
continue
return out
# -----------------------------
# Public API
# -----------------------------
def generate_prior_art_queries(
*,
ollama_host: str,
model: str,
notes: str,
kb: str,
assets: List[str],
) -> Dict[str, Any]:
assets_str = "\n".join(f"- {Path(a).name}" for a in assets) if assets else "(none)"
prompt = (
SEARCH_QUERIES_PROMPT
.replace("{NOTES}", (notes or "").strip() or "(none)")
.replace("{KB}", (kb or "").strip() or "(empty)")
.replace("{ASSETS}", assets_str)
)
raw = ollama_generate(ollama_host, model, prompt)
raw = sanitize_llm_text_simple(raw)
obj: Dict[str, Any]
try:
obj = json.loads(raw)
except Exception:
# try to salvage JSON object substring
m = re.search(r"\{[\s\S]*\}", raw)
if not m:
raise RuntimeError("Model did not return JSON for search queries.")
try:
obj = json.loads(m.group(0))
except Exception:
raise RuntimeError("Failed to parse JSON for search queries.")
q = obj.get("queries") if isinstance(obj, dict) else None
lang = obj.get("lang") if isinstance(obj, dict) else None
if not isinstance(q, list) or len(q) != 3:
raise RuntimeError("Search query generator must return exactly 3 queries.")
queries = [str(x).strip() for x in q if str(x).strip()]
if len(queries) != 3:
raise RuntimeError("Invalid queries after normalization.")
return {"queries": queries, "lang": (str(lang).strip() or "en")}
def prior_art_search(
*,
ollama_host: str,
model: str,
notes: str,
kb: str,
assets: List[str],
searx_url: Optional[str] = None,
embed_model: str = "bge-m3:latest",
status_cb: Optional[Callable[[str], None]] = None,
per_query_max: int = 6,
overall_max_urls: int = 16,
fetch_max_pages: int = 8,
) -> Dict[str, Any]:
"""
End-to-end prior-art search pipeline.
Returns a dict with keys: {"queries", "lang", "results": [{url,title,score,snippet}...]}
"""
def _status(msg: str):
if status_cb:
try:
status_cb(msg)
except Exception:
pass
_status("Generating search queries…")
meta = generate_prior_art_queries(
ollama_host=ollama_host, model=model, notes=notes, kb=kb, assets=assets
)
queries: List[str] = meta["queries"]
lang: str = meta.get("lang", "en")
_status("Searching SearXNG…")
all_urls: List[str] = []
seen = set()
base = (searx_url or SEARX_DEFAULT_URL).rstrip("/")
for q in queries:
try:
res = searx_search(base, q, max_results=per_query_max)
except Exception:
res = []
for item in res:
u = item.get("url")
if not u or u in seen:
continue
seen.add(u)
all_urls.append(u)
if len(all_urls) >= overall_max_urls:
break
if len(all_urls) >= overall_max_urls:
break
if not all_urls:
return {"queries": queries, "lang": lang, "results": []}
_status("Fetching pages…")
pages = fetch_pages(all_urls, max_pages=fetch_max_pages, min_text_len=500)
if not pages:
return {"queries": queries, "lang": lang, "results": []}
# Build doc list and short snippets
docs: List[Tuple[str, str]] = []
for u in all_urls:
if u in pages:
docs.append((u, pages[u]))
if not docs:
return {"queries": queries, "lang": lang, "results": []}
_status("Embedding and reranking…")
# Embed all 3 queries
q_embs: List[List[float]] = []
for q in queries:
try:
q_embs.append(ollama_embed(ollama_host, embed_model, f"query: {q}"))
except Exception:
q_embs.append([])
# Embed documents (short passages)
DOC_SNIPPET_CHARS = 800
d_embs: List[List[float]] = []
for (_u, text) in docs:
snippet = text.replace("\n", " ")
if len(snippet) > DOC_SNIPPET_CHARS:
snippet = snippet[:DOC_SNIPPET_CHARS]
try:
d_embs.append(ollama_embed(ollama_host, embed_model, f"passage: {snippet}"))
except Exception:
d_embs.append([])
# Score each doc by max cosine across query embeddings
results: List[Tuple[str, float]] = [] # (url, score)
for i, (u, _t) in enumerate(docs):
best = 0.0
for qv in q_embs:
if not qv or not d_embs[i]:
continue
c = cosine(qv, d_embs[i])
if c > best:
best = c
# scale to 0..100 for user-facing
score = max(0.0, min(100.0, (best + 1.0) * 50.0))
results.append((u, score))
# Sort and format output
results.sort(key=lambda x: x[1], reverse=True)
out: List[Dict[str, Any]] = []
for (u, sc) in results:
txt = pages.get(u, "").strip()
snippet = txt[:800].replace("\n", " ") if txt else ""
out.append({"url": u, "title": "", "score": round(sc, 1), "snippet": snippet})
return {"queries": queries, "lang": lang, "results": out}