2025-09-13 03:50:53 +02:00
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
Prior-art web search utilities for the Idea → Concept GUI.
|
|
|
|
|
|
|
|
|
|
|
|
Design goals
|
|
|
|
|
|
- Pure stdlib networking (urllib) to avoid new dependencies.
|
|
|
|
|
|
- Works with a local SearXNG instance.
|
|
|
|
|
|
- Uses Ollama for both query generation and embeddings.
|
|
|
|
|
|
- Small, robust, and callable from the GUI with status callbacks.
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
from typing import List, Dict, Any, Optional, Tuple, Callable
|
|
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
|
import re
|
|
|
|
|
|
import time
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
from urllib.parse import urlencode, urlparse
|
|
|
|
|
|
import urllib.request
|
|
|
|
|
|
import urllib.error
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
from bs4 import BeautifulSoup # type: ignore
|
|
|
|
|
|
except Exception: # pragma: no cover
|
|
|
|
|
|
BeautifulSoup = None # type: ignore
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Default SearXNG endpoint (no trailing slash)
|
|
|
|
|
|
SEARX_DEFAULT_URL = "http://localhost:8888"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Prompt to generate search queries from sources
|
|
|
|
|
|
SEARCH_QUERIES_PROMPT = (
|
|
|
|
|
|
"""
|
|
|
|
|
|
You are a cross-domain prior-art scout. Your only task is to EMIT SEARCH QUERIES (no explanations) that can be sent to SearXNG to discover whether the user’s idea already exists.
|
|
|
|
|
|
|
|
|
|
|
|
SOURCES
|
|
|
|
|
|
- NOTES (freeform by user):
|
|
|
|
|
|
{NOTES}
|
|
|
|
|
|
|
|
|
|
|
|
- KNOWLEDGE BASE (excerpts):
|
|
|
|
|
|
{KB}
|
|
|
|
|
|
|
|
|
|
|
|
- ASSET NAMES (filenames the user selected):
|
|
|
|
|
|
{ASSETS}
|
|
|
|
|
|
|
|
|
|
|
|
RULES
|
|
|
|
|
|
- Infer the single most likely idea from the sources above (silently).
|
|
|
|
|
|
- Return STRICT JSON ONLY with exactly this schema:
|
|
|
|
|
|
{"queries": ["<q1>", "<q2>", "<q3>"], "lang": "<iso-639-1>"}
|
|
|
|
|
|
- Requirements:
|
|
|
|
|
|
• Exactly 3 distinct queries; each ≤ 120 characters.
|
|
|
|
|
|
• Use quotes for distinctive phrases when helpful.
|
|
|
|
|
|
• Include synonyms or likely alternate names if useful.
|
|
|
|
|
|
• Add up to 2 disambiguating negatives with a leading “-” only when needed.
|
|
|
|
|
|
• Prefer engine-portable operators only: quotes "…", OR, -, site:, filetype:. Avoid engine-specific syntax.
|
|
|
|
|
|
• Vary the angles across the 3 queries for broad recall:
|
|
|
|
|
|
1) direct existence/implementation check,
|
|
|
|
|
|
2) synonym/alternate-naming variant,
|
|
|
|
|
|
3) precision angle (compare/alternative wording OR a domain-targeted site:/filetype: if obvious).
|
|
|
|
|
|
• Optional: include a simple year hint like 2019..2025 only if recency clearly matters.
|
|
|
|
|
|
- Choose "lang" from the dominant language in NOTES/KB/ASSETS; default to "en" if unclear.
|
|
|
|
|
|
- No prose, no markdown/code fences, no extra keys beyond {"queries","lang"}.
|
|
|
|
|
|
"""
|
|
|
|
|
|
).strip()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# -----------------------------
|
|
|
|
|
|
# Utils
|
|
|
|
|
|
# -----------------------------
|
|
|
|
|
|
|
|
|
|
|
|
USER_AGENT = (
|
|
|
|
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
|
|
|
|
"Chrome Safari"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def sanitize_llm_text_simple(s: str) -> str:
|
|
|
|
|
|
try:
|
|
|
|
|
|
s = re.sub(r"<think>.*?</think>", "", s, flags=re.S | re.I)
|
|
|
|
|
|
s = re.sub(r"^\s*```(?:\w+)?\s*", "", s)
|
|
|
|
|
|
s = re.sub(r"\s*```\s*$", "", s)
|
|
|
|
|
|
return s.strip()
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
return (s or "").strip()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _http_post_json(url: str, payload: Dict[str, Any], *, timeout: int = 600) -> Dict[str, Any]:
|
|
|
|
|
|
data = json.dumps(payload).encode("utf-8")
|
|
|
|
|
|
req = urllib.request.Request(
|
|
|
|
|
|
url,
|
|
|
|
|
|
data=data,
|
|
|
|
|
|
headers={"Content-Type": "application/json", "User-Agent": USER_AGENT},
|
|
|
|
|
|
method="POST",
|
|
|
|
|
|
)
|
|
|
|
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
|
|
|
|
body = resp.read()
|
|
|
|
|
|
return json.loads(body.decode("utf-8", "ignore"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _http_get(url: str, *, timeout: int = 20, max_bytes: int = 1_500_000) -> Tuple[str, Dict[str, str]]:
|
|
|
|
|
|
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
|
|
|
|
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
|
|
|
|
ctype = (resp.headers.get("content-type") or "").lower()
|
|
|
|
|
|
buf = bytearray()
|
|
|
|
|
|
while True:
|
|
|
|
|
|
chunk = resp.read(65536)
|
|
|
|
|
|
if not chunk:
|
|
|
|
|
|
break
|
|
|
|
|
|
buf.extend(chunk)
|
|
|
|
|
|
if len(buf) >= max_bytes:
|
|
|
|
|
|
break
|
|
|
|
|
|
try:
|
|
|
|
|
|
text = buf.decode(resp.headers.get_content_charset() or "utf-8", errors="ignore")
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
text = buf.decode("utf-8", errors="ignore")
|
|
|
|
|
|
return text, {"content-type": ctype}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _http_get_json(url: str, params: Dict[str, Any], *, timeout: int = 30) -> Dict[str, Any]:
|
|
|
|
|
|
q = urlencode(params)
|
|
|
|
|
|
text, _ = _http_get(f"{url}?{q}", timeout=timeout)
|
|
|
|
|
|
try:
|
|
|
|
|
|
return json.loads(text)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _is_probably_html_url(url: str) -> bool:
|
|
|
|
|
|
try:
|
|
|
|
|
|
path = urlparse(url).path.lower()
|
|
|
|
|
|
if not path:
|
|
|
|
|
|
return True
|
|
|
|
|
|
if "." not in path:
|
|
|
|
|
|
return True
|
|
|
|
|
|
ext = path.rsplit(".", 1)[-1]
|
2025-11-30 14:57:31 +01:00
|
|
|
|
return ext not in {
|
|
|
|
|
|
"pdf","jpg","jpeg","png","gif","webp","svg","mp4","mp3","mov","avi",
|
|
|
|
|
|
"zip","gz","7z","tar","rar","woff","woff2","ttf","otf"
|
|
|
|
|
|
}
|
2025-09-13 03:50:53 +02:00
|
|
|
|
except Exception:
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-11-30 14:57:17 +01:00
|
|
|
|
# Keywords that typically belong to navigation, banners, or cookie dialogs
|
|
|
|
|
|
_NOISE_WORDS = {
|
|
|
|
|
|
"home","contact","about","copyright","privacy","policy","cookies","cookie","consent",
|
|
|
|
|
|
"login","log","sign","signup","signin","register","account","subscribe","newsletter",
|
|
|
|
|
|
"advert","advertisement","ads","promo","banner","menu","navigation","nav","footer",
|
|
|
|
|
|
"header","share","social","terms","conditions","accessibility","language","shop",
|
|
|
|
|
|
"search","skip","main","content"
|
|
|
|
|
|
}
|
|
|
|
|
|
_NOISE_ATTR_RE = re.compile(r"(cookie|consent|gdpr|banner|popup|modal|dialog|newsletter|subscribe|advert|promo|signin|signup|login|toolbar|share|social|nav|menu|footer|header)", re.I)
|
2025-11-30 15:01:49 +01:00
|
|
|
|
_CONTENT_HINT_RE = re.compile(r"(article|content|main|post|story|entry|body|text|read|news|blog|page)", re.I)
|
|
|
|
|
|
_NEGATIVE_HINT_RE = re.compile(r"(breadcrumb|nav|menu|sidebar|widget|footer|header|related|share|social|tagcloud|comment|reply|advert|sponsor|promo|contact|newsletter)", re.I)
|
|
|
|
|
|
_COOKIE_HINT_RE = re.compile(r"(cookie|consent|gdpr|cmp|privacy[- ]?settings|tracking|opt[- ]?out|accept all)", re.I)
|
2025-11-30 14:57:17 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _clean_lines(text: str) -> str:
|
|
|
|
|
|
lines = [ln.strip() for ln in text.splitlines()]
|
|
|
|
|
|
out = []
|
|
|
|
|
|
seen: set[str] = set()
|
|
|
|
|
|
for ln in lines:
|
|
|
|
|
|
if not ln:
|
|
|
|
|
|
if out and out[-1]:
|
|
|
|
|
|
out.append("")
|
|
|
|
|
|
continue
|
|
|
|
|
|
lower = ln.lower()
|
|
|
|
|
|
# Drop obvious boilerplate / cookie notices / menu crumbs
|
|
|
|
|
|
if len(ln) <= 140:
|
|
|
|
|
|
if any(k in lower for k in ("cookie", "consent", "newsletter", "advert", "privacy policy", "terms", "skip to main", "enable javascript", "accept all", "manage preferences")):
|
|
|
|
|
|
continue
|
|
|
|
|
|
tokens = re.findall(r"[a-zA-Z]+", lower)
|
|
|
|
|
|
if tokens and all(t in _NOISE_WORDS for t in tokens) and len(tokens) <= 8:
|
|
|
|
|
|
continue
|
|
|
|
|
|
if lower in seen:
|
|
|
|
|
|
continue
|
|
|
|
|
|
seen.add(lower)
|
|
|
|
|
|
out.append(ln)
|
|
|
|
|
|
|
|
|
|
|
|
# Collapse multiple blank lines
|
|
|
|
|
|
compact: list[str] = []
|
|
|
|
|
|
for ln in out:
|
|
|
|
|
|
if ln == "" and (not compact or compact[-1] == ""):
|
|
|
|
|
|
continue
|
|
|
|
|
|
compact.append(ln)
|
|
|
|
|
|
return "\n".join(compact)
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-09-13 03:50:53 +02:00
|
|
|
|
def _extract_text(html: str, *, max_len: int = 120_000) -> str:
|
|
|
|
|
|
if not html:
|
|
|
|
|
|
return ""
|
|
|
|
|
|
if BeautifulSoup is None:
|
|
|
|
|
|
# crude fallback if bs4 missing
|
|
|
|
|
|
txt = re.sub(r"<\s*(script|style)[^>]*>.*?<\s*/\s*\1\s*>", " ", html, flags=re.S|re.I)
|
|
|
|
|
|
txt = re.sub(r"<[^>]+>", " ", txt)
|
|
|
|
|
|
txt = re.sub(r"\s+", " ", txt)
|
2025-11-30 14:57:17 +01:00
|
|
|
|
return _clean_lines(txt.strip())[:max_len]
|
2025-09-13 03:50:53 +02:00
|
|
|
|
try:
|
|
|
|
|
|
try:
|
|
|
|
|
|
soup = BeautifulSoup(html, "lxml")
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
soup = BeautifulSoup(html, "html.parser")
|
2025-11-30 14:57:17 +01:00
|
|
|
|
|
|
|
|
|
|
# Strip obvious boilerplate containers first
|
|
|
|
|
|
for tag in soup.select("script,style,noscript,template"):
|
2025-09-13 03:50:53 +02:00
|
|
|
|
tag.decompose()
|
2025-11-30 14:57:17 +01:00
|
|
|
|
|
2025-11-30 15:05:16 +01:00
|
|
|
|
# Remove cookie/consent overlays by attributes or short text snippets (conservative)
|
2025-11-30 15:02:12 +01:00
|
|
|
|
for el in list(soup.find_all(True)):
|
|
|
|
|
|
if el.name in {"html", "body"}:
|
|
|
|
|
|
continue
|
|
|
|
|
|
attr_str = " ".join([
|
|
|
|
|
|
el.get("id") or "",
|
|
|
|
|
|
" ".join(el.get("class") or []),
|
|
|
|
|
|
el.get("role") or "",
|
|
|
|
|
|
el.get("aria-label") or "",
|
|
|
|
|
|
]).lower()
|
|
|
|
|
|
text_preview = (el.get_text(" ", strip=True)[:220] or "").lower()
|
2025-11-30 15:05:16 +01:00
|
|
|
|
if (_COOKIE_HINT_RE.search(attr_str) or _COOKIE_HINT_RE.search(text_preview)):
|
|
|
|
|
|
# only remove if looks like a small overlay/dialog, not the main body
|
|
|
|
|
|
full_txt = " ".join(el.get_text(" ", strip=True).split())
|
|
|
|
|
|
if len(text_preview) <= 260 or len(full_txt) <= 800:
|
|
|
|
|
|
try:
|
|
|
|
|
|
el.decompose()
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
2025-11-30 14:57:17 +01:00
|
|
|
|
|
|
|
|
|
|
def _norm(node):
|
|
|
|
|
|
txt = node.get_text("\n", strip=True)
|
|
|
|
|
|
txt = re.sub(r"[ \t]+", " ", txt)
|
|
|
|
|
|
txt = re.sub(r"\n{3,}", "\n\n", txt)
|
|
|
|
|
|
return txt
|
|
|
|
|
|
|
2025-11-30 15:02:12 +01:00
|
|
|
|
def _score_node(node, raw_text: str, attr_str: str) -> float:
|
|
|
|
|
|
length = len(raw_text)
|
2025-11-30 15:05:16 +01:00
|
|
|
|
if length < 40:
|
2025-11-30 15:02:12 +01:00
|
|
|
|
return 0.0
|
|
|
|
|
|
link_count = len(node.find_all("a"))
|
|
|
|
|
|
link_density = link_count / max(1.0, length / 80.0)
|
|
|
|
|
|
bonus = 0.0
|
|
|
|
|
|
if node.name == "article":
|
|
|
|
|
|
bonus += 0.6 * length
|
|
|
|
|
|
elif node.name == "main":
|
|
|
|
|
|
bonus += 0.4 * length
|
|
|
|
|
|
elif node.name == "section":
|
|
|
|
|
|
bonus += 0.2 * length
|
|
|
|
|
|
if _CONTENT_HINT_RE.search(attr_str):
|
|
|
|
|
|
bonus += 0.35 * length
|
|
|
|
|
|
if _NEGATIVE_HINT_RE.search(attr_str):
|
2025-11-30 15:05:16 +01:00
|
|
|
|
bonus -= 0.2 * length
|
2025-11-30 15:02:12 +01:00
|
|
|
|
penalty = min(0.9, link_density * 0.6)
|
|
|
|
|
|
return (length + bonus) * (1.0 - penalty)
|
|
|
|
|
|
|
2025-11-30 15:05:16 +01:00
|
|
|
|
# Score candidate blocks to pick main content without over-deleting nodes
|
2025-11-30 14:57:17 +01:00
|
|
|
|
best_text = ""
|
|
|
|
|
|
best_score = 0.0
|
|
|
|
|
|
for node in soup.find_all(["article", "main", "section", "div", "body"]):
|
|
|
|
|
|
raw = _norm(node)
|
2025-11-30 15:02:12 +01:00
|
|
|
|
if not raw:
|
2025-11-30 14:57:17 +01:00
|
|
|
|
continue
|
2025-11-30 15:02:12 +01:00
|
|
|
|
attr_str = " ".join([
|
|
|
|
|
|
node.get("id") or "",
|
|
|
|
|
|
" ".join(node.get("class") or []),
|
|
|
|
|
|
node.get("role") or "",
|
|
|
|
|
|
node.get("aria-label") or "",
|
|
|
|
|
|
]).lower()
|
|
|
|
|
|
score = _score_node(node, raw, attr_str)
|
2025-11-30 14:57:17 +01:00
|
|
|
|
if score > best_score:
|
|
|
|
|
|
best_score = score
|
|
|
|
|
|
best_text = raw
|
|
|
|
|
|
|
|
|
|
|
|
if not best_text:
|
|
|
|
|
|
# fallback to whole body/text
|
|
|
|
|
|
target = soup.body or soup
|
|
|
|
|
|
best_text = _norm(target)
|
|
|
|
|
|
|
|
|
|
|
|
cleaned = _clean_lines(best_text)
|
2025-11-30 15:02:12 +01:00
|
|
|
|
# If too short, fall back to broader body text instead of dropping the page
|
2025-11-30 15:05:24 +01:00
|
|
|
|
if len(cleaned) < 200:
|
2025-11-30 15:02:12 +01:00
|
|
|
|
broader = _clean_lines(_norm(soup.body or soup))
|
|
|
|
|
|
if len(broader) > len(cleaned):
|
|
|
|
|
|
cleaned = broader
|
|
|
|
|
|
|
2025-11-30 15:05:24 +01:00
|
|
|
|
# Absolute fallback to regex-based stripping if soup path failed
|
|
|
|
|
|
if len(cleaned.strip()) < 120:
|
|
|
|
|
|
txt = re.sub(r"<\s*(script|style)[^>]*>.*?<\s*/\s*\1\s*>", " ", html, flags=re.S|re.I)
|
|
|
|
|
|
txt = re.sub(r"<[^>]+>", " ", txt)
|
|
|
|
|
|
txt = re.sub(r"\s+", " ", txt)
|
|
|
|
|
|
cleaned = txt.strip()
|
|
|
|
|
|
|
2025-11-30 14:57:17 +01:00
|
|
|
|
if not cleaned.strip():
|
|
|
|
|
|
return ""
|
|
|
|
|
|
if len(cleaned) > max_len:
|
|
|
|
|
|
return cleaned[:max_len]
|
|
|
|
|
|
return cleaned
|
2025-09-13 03:50:53 +02:00
|
|
|
|
except Exception:
|
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# -----------------------------
|
|
|
|
|
|
# Ollama helpers
|
|
|
|
|
|
# -----------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def ollama_generate(host: str, model: str, prompt: str, *, timeout: int = 600) -> str:
|
|
|
|
|
|
url = f"{host.rstrip('/')}/api/generate"
|
|
|
|
|
|
payload = {"model": model, "prompt": prompt, "stream": False}
|
|
|
|
|
|
try:
|
|
|
|
|
|
obj = _http_post_json(url, payload, timeout=timeout)
|
|
|
|
|
|
return (obj.get("response") or "").strip()
|
|
|
|
|
|
except urllib.error.HTTPError as e:
|
|
|
|
|
|
raise RuntimeError(f"Ollama HTTP error {e.code}: {e.read().decode('utf-8','ignore')}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
raise RuntimeError(f"Ollama request failed: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ollama_embed(host: str, model: str, text: str, *, timeout: int = 120) -> List[float]:
|
|
|
|
|
|
url = f"{host.rstrip('/')}/api/embeddings"
|
|
|
|
|
|
payload = {"model": model, "prompt": text}
|
|
|
|
|
|
try:
|
|
|
|
|
|
obj = _http_post_json(url, payload, timeout=timeout)
|
|
|
|
|
|
except urllib.error.HTTPError as e:
|
|
|
|
|
|
raise RuntimeError(f"Ollama embeddings HTTP error {e.code}: {e.read().decode('utf-8','ignore')}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
raise RuntimeError(f"Ollama embeddings request failed: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
# Normalize common shapes
|
|
|
|
|
|
if isinstance(obj, dict):
|
|
|
|
|
|
if "error" in obj:
|
|
|
|
|
|
raise RuntimeError(f"Ollama embeddings error: {obj.get('error')}")
|
|
|
|
|
|
if "embedding" in obj and isinstance(obj["embedding"], list):
|
|
|
|
|
|
return [float(x) for x in obj["embedding"] if isinstance(x, (int, float))]
|
|
|
|
|
|
if "data" in obj and isinstance(obj["data"], list) and obj["data"]:
|
|
|
|
|
|
em = obj["data"][0].get("embedding", [])
|
|
|
|
|
|
if isinstance(em, list):
|
|
|
|
|
|
return [float(x) for x in em if isinstance(x, (int, float))]
|
|
|
|
|
|
if "embeddings" in obj and isinstance(obj["embeddings"], list) and obj["embeddings"]:
|
|
|
|
|
|
em0 = obj["embeddings"][0]
|
|
|
|
|
|
if isinstance(em0, dict):
|
|
|
|
|
|
em0 = em0.get("embedding", [])
|
|
|
|
|
|
if isinstance(em0, list):
|
|
|
|
|
|
return [float(x) for x in em0 if isinstance(x, (int, float))]
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def cosine(a: List[float], b: List[float]) -> float:
|
|
|
|
|
|
n = min(len(a), len(b))
|
|
|
|
|
|
if n == 0:
|
|
|
|
|
|
return 0.0
|
|
|
|
|
|
dot = na = nb = 0.0
|
|
|
|
|
|
for i in range(n):
|
|
|
|
|
|
x = a[i]; y = b[i]
|
|
|
|
|
|
dot += x * y
|
|
|
|
|
|
na += x * x
|
|
|
|
|
|
nb += y * y
|
|
|
|
|
|
if na <= 0.0 or nb <= 0.0:
|
|
|
|
|
|
return 0.0
|
|
|
|
|
|
return dot / ((na ** 0.5) * (nb ** 0.5))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# -----------------------------
|
|
|
|
|
|
# SearXNG search + fetch
|
|
|
|
|
|
# -----------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def searx_search(base_url: str, query: str, *, max_results: int = 6) -> List[Dict[str, Any]]:
|
|
|
|
|
|
base = (base_url or SEARX_DEFAULT_URL).rstrip("/")
|
|
|
|
|
|
params = {"q": query, "format": "json", "safesearch": 1}
|
|
|
|
|
|
data = _http_get_json(f"{base}/search", params, timeout=30)
|
|
|
|
|
|
out: List[Dict[str, Any]] = []
|
|
|
|
|
|
seen = set()
|
|
|
|
|
|
for item in data.get("results", []):
|
|
|
|
|
|
url = item.get("url")
|
|
|
|
|
|
title = item.get("title") or ""
|
|
|
|
|
|
if not url or url in seen:
|
|
|
|
|
|
continue
|
|
|
|
|
|
if not _is_probably_html_url(url):
|
|
|
|
|
|
continue
|
|
|
|
|
|
seen.add(url)
|
|
|
|
|
|
out.append({"url": url, "title": title})
|
|
|
|
|
|
if len(out) >= max_results:
|
|
|
|
|
|
break
|
|
|
|
|
|
return out
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-11-30 15:05:27 +01:00
|
|
|
|
def fetch_pages(urls: List[str], *, max_pages: int = 8, min_text_len: int = 200) -> Dict[str, str]:
|
2025-09-13 03:50:53 +02:00
|
|
|
|
urls = urls[:max_pages]
|
|
|
|
|
|
out: Dict[str, str] = {}
|
|
|
|
|
|
for u in urls:
|
|
|
|
|
|
try:
|
|
|
|
|
|
html, hdrs = _http_get(u, timeout=20)
|
|
|
|
|
|
ctype = (hdrs.get("content-type") or "").lower()
|
|
|
|
|
|
if ctype and not ("text/html" in ctype or "application/xhtml+xml" in ctype or ctype.startswith("text/")):
|
|
|
|
|
|
continue
|
|
|
|
|
|
txt = _extract_text(html)
|
|
|
|
|
|
if len(txt) >= min_text_len:
|
|
|
|
|
|
out[u] = txt
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
continue
|
|
|
|
|
|
return out
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# -----------------------------
|
|
|
|
|
|
# Public API
|
|
|
|
|
|
# -----------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def generate_prior_art_queries(
|
|
|
|
|
|
*,
|
|
|
|
|
|
ollama_host: str,
|
|
|
|
|
|
model: str,
|
|
|
|
|
|
notes: str,
|
|
|
|
|
|
kb: str,
|
|
|
|
|
|
assets: List[str],
|
|
|
|
|
|
) -> Dict[str, Any]:
|
|
|
|
|
|
assets_str = "\n".join(f"- {Path(a).name}" for a in assets) if assets else "(none)"
|
|
|
|
|
|
prompt = (
|
|
|
|
|
|
SEARCH_QUERIES_PROMPT
|
|
|
|
|
|
.replace("{NOTES}", (notes or "").strip() or "(none)")
|
|
|
|
|
|
.replace("{KB}", (kb or "").strip() or "(empty)")
|
|
|
|
|
|
.replace("{ASSETS}", assets_str)
|
|
|
|
|
|
)
|
|
|
|
|
|
raw = ollama_generate(ollama_host, model, prompt)
|
|
|
|
|
|
raw = sanitize_llm_text_simple(raw)
|
|
|
|
|
|
obj: Dict[str, Any]
|
|
|
|
|
|
try:
|
|
|
|
|
|
obj = json.loads(raw)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
# try to salvage JSON object substring
|
|
|
|
|
|
m = re.search(r"\{[\s\S]*\}", raw)
|
|
|
|
|
|
if not m:
|
|
|
|
|
|
raise RuntimeError("Model did not return JSON for search queries.")
|
|
|
|
|
|
try:
|
|
|
|
|
|
obj = json.loads(m.group(0))
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
raise RuntimeError("Failed to parse JSON for search queries.")
|
|
|
|
|
|
|
|
|
|
|
|
q = obj.get("queries") if isinstance(obj, dict) else None
|
|
|
|
|
|
lang = obj.get("lang") if isinstance(obj, dict) else None
|
|
|
|
|
|
if not isinstance(q, list) or len(q) != 3:
|
|
|
|
|
|
raise RuntimeError("Search query generator must return exactly 3 queries.")
|
|
|
|
|
|
queries = [str(x).strip() for x in q if str(x).strip()]
|
|
|
|
|
|
if len(queries) != 3:
|
|
|
|
|
|
raise RuntimeError("Invalid queries after normalization.")
|
|
|
|
|
|
return {"queries": queries, "lang": (str(lang).strip() or "en")}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def prior_art_search(
|
|
|
|
|
|
*,
|
|
|
|
|
|
ollama_host: str,
|
|
|
|
|
|
model: str,
|
|
|
|
|
|
notes: str,
|
|
|
|
|
|
kb: str,
|
|
|
|
|
|
assets: List[str],
|
|
|
|
|
|
searx_url: Optional[str] = None,
|
|
|
|
|
|
embed_model: str = "bge-m3:latest",
|
|
|
|
|
|
status_cb: Optional[Callable[[str], None]] = None,
|
|
|
|
|
|
per_query_max: int = 6,
|
|
|
|
|
|
overall_max_urls: int = 16,
|
|
|
|
|
|
fetch_max_pages: int = 8,
|
|
|
|
|
|
) -> Dict[str, Any]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
End-to-end prior-art search pipeline.
|
|
|
|
|
|
Returns a dict with keys: {"queries", "lang", "results": [{url,title,score,snippet}...]}
|
|
|
|
|
|
"""
|
|
|
|
|
|
def _status(msg: str):
|
|
|
|
|
|
if status_cb:
|
|
|
|
|
|
try:
|
|
|
|
|
|
status_cb(msg)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
_status("Generating search queries…")
|
|
|
|
|
|
meta = generate_prior_art_queries(
|
|
|
|
|
|
ollama_host=ollama_host, model=model, notes=notes, kb=kb, assets=assets
|
|
|
|
|
|
)
|
|
|
|
|
|
queries: List[str] = meta["queries"]
|
|
|
|
|
|
lang: str = meta.get("lang", "en")
|
|
|
|
|
|
|
|
|
|
|
|
_status("Searching SearXNG…")
|
|
|
|
|
|
all_urls: List[str] = []
|
|
|
|
|
|
seen = set()
|
|
|
|
|
|
base = (searx_url or SEARX_DEFAULT_URL).rstrip("/")
|
|
|
|
|
|
for q in queries:
|
|
|
|
|
|
try:
|
|
|
|
|
|
res = searx_search(base, q, max_results=per_query_max)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
res = []
|
|
|
|
|
|
for item in res:
|
|
|
|
|
|
u = item.get("url")
|
|
|
|
|
|
if not u or u in seen:
|
|
|
|
|
|
continue
|
|
|
|
|
|
seen.add(u)
|
|
|
|
|
|
all_urls.append(u)
|
|
|
|
|
|
if len(all_urls) >= overall_max_urls:
|
|
|
|
|
|
break
|
|
|
|
|
|
if len(all_urls) >= overall_max_urls:
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
if not all_urls:
|
|
|
|
|
|
return {"queries": queries, "lang": lang, "results": []}
|
|
|
|
|
|
|
|
|
|
|
|
_status("Fetching pages…")
|
|
|
|
|
|
pages = fetch_pages(all_urls, max_pages=fetch_max_pages, min_text_len=500)
|
|
|
|
|
|
if not pages:
|
|
|
|
|
|
return {"queries": queries, "lang": lang, "results": []}
|
|
|
|
|
|
|
|
|
|
|
|
# Build doc list and short snippets
|
|
|
|
|
|
docs: List[Tuple[str, str]] = []
|
|
|
|
|
|
for u in all_urls:
|
|
|
|
|
|
if u in pages:
|
|
|
|
|
|
docs.append((u, pages[u]))
|
|
|
|
|
|
if not docs:
|
|
|
|
|
|
return {"queries": queries, "lang": lang, "results": []}
|
|
|
|
|
|
|
|
|
|
|
|
_status("Embedding and reranking…")
|
|
|
|
|
|
# Embed all 3 queries
|
|
|
|
|
|
q_embs: List[List[float]] = []
|
|
|
|
|
|
for q in queries:
|
|
|
|
|
|
try:
|
|
|
|
|
|
q_embs.append(ollama_embed(ollama_host, embed_model, f"query: {q}"))
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
q_embs.append([])
|
|
|
|
|
|
|
|
|
|
|
|
# Embed documents (short passages)
|
|
|
|
|
|
DOC_SNIPPET_CHARS = 800
|
|
|
|
|
|
d_embs: List[List[float]] = []
|
|
|
|
|
|
for (_u, text) in docs:
|
|
|
|
|
|
snippet = text.replace("\n", " ")
|
|
|
|
|
|
if len(snippet) > DOC_SNIPPET_CHARS:
|
|
|
|
|
|
snippet = snippet[:DOC_SNIPPET_CHARS]
|
|
|
|
|
|
try:
|
|
|
|
|
|
d_embs.append(ollama_embed(ollama_host, embed_model, f"passage: {snippet}"))
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
d_embs.append([])
|
|
|
|
|
|
|
|
|
|
|
|
# Score each doc by max cosine across query embeddings
|
|
|
|
|
|
results: List[Tuple[str, float]] = [] # (url, score)
|
|
|
|
|
|
for i, (u, _t) in enumerate(docs):
|
|
|
|
|
|
best = 0.0
|
|
|
|
|
|
for qv in q_embs:
|
|
|
|
|
|
if not qv or not d_embs[i]:
|
|
|
|
|
|
continue
|
|
|
|
|
|
c = cosine(qv, d_embs[i])
|
|
|
|
|
|
if c > best:
|
|
|
|
|
|
best = c
|
|
|
|
|
|
# scale to 0..100 for user-facing
|
|
|
|
|
|
score = max(0.0, min(100.0, (best + 1.0) * 50.0))
|
|
|
|
|
|
results.append((u, score))
|
|
|
|
|
|
|
|
|
|
|
|
# Sort and format output
|
|
|
|
|
|
results.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
|
|
out: List[Dict[str, Any]] = []
|
|
|
|
|
|
for (u, sc) in results:
|
|
|
|
|
|
txt = pages.get(u, "").strip()
|
|
|
|
|
|
snippet = txt[:800].replace("\n", " ") if txt else ""
|
|
|
|
|
|
out.append({"url": u, "title": "", "score": round(sc, 1), "snippet": snippet})
|
|
|
|
|
|
|
|
|
|
|
|
return {"queries": queries, "lang": lang, "results": out}
|