Files
Heimgeist/backend/local_rag.py

1294 lines
43 KiB
Python

from __future__ import annotations
import asyncio
import functools
import hashlib
import importlib
import json
import os
import re
import shutil
import threading
import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.parse import quote
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from .app_settings import (
DEFAULT_EMBED_MODEL as DEFAULT_EMBED_MODEL_SETTING,
get_embed_model_preference,
get_ollama_api_url,
)
router = APIRouter(tags=["local-rag"])
LIB_ROOT = Path(__file__).parent / "libraries"
LIB_ROOT.mkdir(parents=True, exist_ok=True)
RAW_CORPUS_PROFILE = "per-file-default-v1"
PREPARE_PROFILE = "selective-enrich-v2"
DEFAULT_EMBED_MODEL = DEFAULT_EMBED_MODEL_SETTING
DEFAULT_ENRICH_MODEL = "qwen3:4b"
DEFAULT_ENRICH_MIN_CHARS = 240
DEFAULT_ENRICH_MAX_TEXT = 6000
DEFAULT_ENRICH_CONCURRENCY = max(1, min(4, (os.cpu_count() or 4) // 2))
JOB_EXECUTOR = ThreadPoolExecutor(max_workers=2)
JOBS: Dict[str, Dict[str, Any]] = {}
LIB_LOCKS: Dict[str, asyncio.Lock] = {}
class CreateLibraryRequest(BaseModel):
name: str
class RenameLibraryRequest(BaseModel):
name: str
class RegisterPathsRequest(BaseModel):
paths: List[str]
class RemoveFileRequest(BaseModel):
rel: str
class UpdateFileEnrichmentRequest(BaseModel):
rel: str
enabled: bool
class EmbedLibraryRequest(BaseModel):
embed_model: Optional[str] = None
ollama: Optional[str] = None
target_chars: int = 2000
overlap_chars: int = 200
concurrency: int = 6
class LibraryContextRequest(BaseModel):
prompt: str
top_k: int = 5
ollama: Optional[str] = None
embed_model: Optional[str] = None
gen_model: str = "qwen3:4b"
def _default_ollama_url() -> str:
return get_ollama_api_url()
def _default_embed_model() -> str:
return get_embed_model_preference()
def _resolve_ollama_url(value: Optional[str] = None) -> str:
if isinstance(value, str) and value.strip():
return value.strip().rstrip("/")
return _default_ollama_url()
def now_iso() -> str:
return datetime.utcnow().isoformat(timespec="seconds") + "Z"
def slugify(name: str) -> str:
cleaned = re.sub(r"[^a-zA-Z0-9\- ]+", "", name).strip().lower()
cleaned = re.sub(r"\s+", "-", cleaned)
return cleaned or f"lib-{uuid.uuid4().hex[:8]}"
def lib_dir(slug: str) -> Path:
return LIB_ROOT / slug
def lib_json(slug: str) -> Path:
return lib_dir(slug) / "library.json"
def stage_dir(slug: str) -> Path:
path = lib_dir(slug) / "stage"
path.mkdir(parents=True, exist_ok=True)
return path
def indexes_dir(slug: str) -> Path:
path = lib_dir(slug) / "indexes"
path.mkdir(parents=True, exist_ok=True)
return path
def default_library_data(name: str, slug: str) -> Dict[str, Any]:
return {
"id": uuid.uuid4().hex,
"name": name,
"slug": slug,
"created_at": now_iso(),
"files": [],
"pipeline": {},
}
def _read_json(path: Path) -> Dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def read_library(slug: str) -> Dict[str, Any]:
path = lib_json(slug)
if not path.exists():
raise HTTPException(status_code=404, detail="Library not found")
return _read_json(path)
def write_library(slug: str, data: Dict[str, Any]) -> None:
path = lib_json(slug)
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(".tmp")
tmp.write_text(json.dumps(data, indent=2), encoding="utf-8")
tmp.replace(path)
def _line_count(path: Path) -> int:
if not path.exists():
return 0
with path.open("r", encoding="utf-8", errors="ignore") as handle:
return sum(1 for line in handle if line.strip())
def _file_uri(path_value: str) -> str:
return f"file://{quote(path_value)}"
def _pipeline_meta(data: Dict[str, Any]) -> Dict[str, Any]:
pipeline = data.get("pipeline")
if isinstance(pipeline, dict):
return pipeline
return {}
def _source_signature(files: List[Dict[str, Any]]) -> Optional[str]:
if not files:
return None
digest = hashlib.sha256()
ordered = sorted(
files,
key=lambda entry: (
str(entry.get("sha256") or ""),
str(entry.get("path") or ""),
str(entry.get("rel") or ""),
),
)
for entry in ordered:
payload = {
"sha256": entry.get("sha256") or "",
"path": entry.get("path") or "",
"rel": entry.get("rel") or "",
"size": int(entry.get("size") or 0),
}
digest.update(json.dumps(payload, sort_keys=True).encode("utf-8"))
digest.update(b"\n")
return digest.hexdigest()
def _file_enrich_enabled(entry: Dict[str, Any]) -> bool:
value = entry.get("enrich_enabled")
if isinstance(value, bool):
return value
if value is None:
return False
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "on"}
return bool(value)
def _versioned_signature(*parts: Optional[str]) -> Optional[str]:
clean_parts = [str(part) for part in parts if part]
if not clean_parts:
return None
digest = hashlib.sha256()
for part in clean_parts:
digest.update(part.encode("utf-8"))
digest.update(b"\n")
return digest.hexdigest()
def _corpus_signature(files: List[Dict[str, Any]]) -> Optional[str]:
return _versioned_signature("corpus", RAW_CORPUS_PROFILE, _source_signature(files))
def _prepare_signature(files: List[Dict[str, Any]]) -> Optional[str]:
source_signature = _source_signature(files)
if not source_signature:
return None
digest = hashlib.sha256()
digest.update(f"prepare\n{RAW_CORPUS_PROFILE}\n{PREPARE_PROFILE}\n{source_signature}\n".encode("utf-8"))
for entry in sorted(files, key=lambda item: str(item.get("rel") or item.get("path") or item.get("sha256") or "")):
rel = str(entry.get("rel") or "")
enabled = "1" if _file_enrich_enabled(entry) else "0"
digest.update(f"{rel}\t{enabled}\n".encode("utf-8"))
return digest.hexdigest()
def _collect_library_paths(slug: str) -> Dict[str, Path]:
base = lib_dir(slug)
return {
"base": base,
"stage": stage_dir(slug),
"corpus": base / "corpus.jsonl",
"enrich_input": base / "corpus.enrich-input.jsonl",
"enhanced": base / "corpus.enhanced.jsonl",
"shadow": base / "corpus.shadow.jsonl",
"shadow_partial": base / "corpus.shadow.partial.jsonl",
"indexes": indexes_dir(slug),
"shadow_index": indexes_dir(slug) / "shadow.index.faiss",
"shadow_store": indexes_dir(slug) / "shadow.meta.jsonl",
"content_index": indexes_dir(slug) / "content.index.faiss",
"content_store": indexes_dir(slug) / "content.meta.jsonl",
}
def _cleanup_enrichment_artifacts(slug: str) -> None:
paths = _collect_library_paths(slug)
for key in ("enrich_input", "enhanced", "shadow", "shadow_partial"):
target = paths[key]
if target.exists():
target.unlink()
def _cleanup_generated_artifacts(slug: str) -> None:
paths = _collect_library_paths(slug)
for key in (
"corpus",
"enrich_input",
"enhanced",
"shadow",
"shadow_partial",
"shadow_index",
"shadow_store",
"content_index",
"content_store",
):
target = paths[key]
if target.exists():
target.unlink()
def _latest_library_job(slug: str, *, statuses: Optional[set[str]] = None) -> Optional[Dict[str, Any]]:
matches = [
job for job in JOBS.values()
if job["slug"] == slug and (statuses is None or job["status"] in statuses)
]
if not matches:
return None
matches.sort(key=lambda job: (str(job.get("created_at") or ""), job["id"]), reverse=True)
return matches[0]
def _build_file_sync_payload(
slug: str,
files: List[Dict[str, Any]],
pipeline: Dict[str, Any],
) -> List[Dict[str, Any]]:
active_job = _latest_library_job(slug, statuses={"queued", "running"})
failed_job = _latest_library_job(slug, statuses={"failed"})
pending_signature = pipeline.get("pending_prepare_signature")
out: List[Dict[str, Any]] = []
for entry in files:
file_entry = dict(entry)
file_entry["enrich_enabled"] = _file_enrich_enabled(file_entry)
stored_status = str(file_entry.get("sync_status") or "pending")
sync_status = stored_status
sync_progress = 100.0 if stored_status == "ready" else 0.0
sync_detail = ""
sync_error = file_entry.get("sync_error")
if stored_status != "ready":
if active_job:
sync_status = "syncing"
sync_progress = float(active_job.get("progress") or 0.0)
sync_detail = active_job.get("detail") or ""
sync_error = None
elif failed_job and pending_signature:
sync_status = "failed"
sync_progress = 0.0
sync_detail = failed_job.get("detail") or ""
sync_error = failed_job.get("error")
elif pending_signature:
sync_status = "pending"
sync_progress = 0.0
file_entry["sync"] = {
"status": sync_status,
"progress": round(sync_progress, 1),
"detail": sync_detail,
"error": sync_error,
"ready": sync_status == "ready",
}
out.append(file_entry)
return out
def library_payload(data: Dict[str, Any]) -> Dict[str, Any]:
paths = _collect_library_paths(data["slug"])
pipeline = _pipeline_meta(data)
raw_files = list(data.get("files", []))
files = _build_file_sync_payload(data["slug"], raw_files, pipeline)
source_signature = _source_signature(files)
corpus_signature = _corpus_signature(files)
prepare_signature = _prepare_signature(files)
enrichment_enabled_files = sum(1 for entry in files if _file_enrich_enabled(entry))
has_corpus = bool(corpus_signature) and pipeline.get("corpus_signature") == corpus_signature and paths["corpus"].exists()
is_enriched = (
enrichment_enabled_files > 0
and bool(prepare_signature)
and pipeline.get("enriched_signature") == prepare_signature
and paths["enhanced"].exists()
and paths["shadow_partial"].exists()
)
is_indexed = (
bool(prepare_signature)
and pipeline.get("indexed_signature") == prepare_signature
and paths["shadow_index"].exists()
and paths["shadow_store"].exists()
and paths["content_index"].exists()
and paths["content_store"].exists()
)
stages = {
"has_files": len(files) > 0,
"has_corpus": has_corpus,
"is_enriched": is_enriched,
"is_indexed": is_indexed,
"is_ready_for_chat": is_indexed,
"needs_prepare": bool(files) and not is_indexed,
"enrichment_enabled_files": enrichment_enabled_files,
}
artifacts = {
"corpus_records": _line_count(paths["corpus"]) if has_corpus else 0,
"enhanced_records": _line_count(paths["enhanced"]) if is_enriched else 0,
"shadow_records": _line_count(paths["shadow_partial"]) if is_enriched else 0,
}
return {
**data,
"files": files,
"pipeline": pipeline,
"source_signature": source_signature,
"corpus_signature": corpus_signature,
"prepare_signature": prepare_signature,
"states": stages,
"artifacts": artifacts,
}
def _walk_input_paths(paths: List[str]) -> List[Path]:
out: List[Path] = []
for raw in paths:
current = Path(raw).expanduser().resolve()
if not current.exists():
continue
if current.is_file():
out.append(current)
continue
for child in current.rglob("*"):
if child.is_file():
out.append(child.resolve())
return out
def _sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def _stage_name(sha: str, path: Path) -> str:
safe_name = re.sub(r"[^A-Za-z0-9._-]+", "_", path.name).strip("._") or "file"
return f"{sha}--{safe_name}"
def _job_public(job: Dict[str, Any]) -> Dict[str, Any]:
return {
"id": job["id"],
"slug": job["slug"],
"type": job["type"],
"status": job["status"],
"phase": job.get("phase"),
"progress": job.get("progress", 0.0),
"detail": job.get("detail", ""),
"error": job.get("error"),
"result": job.get("result"),
"created_at": job["created_at"],
"finished_at": job.get("finished_at"),
}
def _has_active_job(slug: str) -> bool:
return any(
job["slug"] == slug and job["status"] in {"queued", "running"}
for job in JOBS.values()
)
def _load_pipeline_fn(module_name: str, attr: str):
try:
module = importlib.import_module(f"backend.rag.{module_name}")
except ModuleNotFoundError:
module = importlib.import_module(f".rag.{module_name}", package=__package__)
return getattr(module, attr)
def _mark_pipeline_stage(slug: str, stage: str, source_signature: Optional[str]) -> None:
path = lib_json(slug)
if not path.exists():
return
data = _read_json(path)
pipeline = data.get("pipeline")
if not isinstance(pipeline, dict):
pipeline = {}
data["pipeline"] = pipeline
stamp = now_iso()
if stage == "build":
pipeline["corpus_signature"] = source_signature
pipeline["corpus_updated_at"] = stamp
pipeline.pop("enriched_signature", None)
pipeline.pop("enriched_updated_at", None)
pipeline.pop("indexed_signature", None)
pipeline.pop("indexed_updated_at", None)
elif stage == "enrich":
pipeline["enriched_signature"] = source_signature
pipeline["enriched_updated_at"] = stamp
pipeline.pop("indexed_signature", None)
pipeline.pop("indexed_updated_at", None)
elif stage == "embed":
pipeline["indexed_signature"] = source_signature
pipeline["indexed_updated_at"] = stamp
else:
raise ValueError(f"Unknown pipeline stage: {stage}")
write_library(slug, data)
def _set_pipeline_embed_model(slug: str, embed_model: Optional[str]) -> None:
if not embed_model:
return
path = lib_json(slug)
if not path.exists():
return
data = _read_json(path)
pipeline = data.get("pipeline")
if not isinstance(pipeline, dict):
pipeline = {}
data["pipeline"] = pipeline
pipeline["embed_model"] = embed_model
pipeline["embed_model_updated_at"] = now_iso()
write_library(slug, data)
def _set_pending_prepare_signature(data: Dict[str, Any], source_signature: Optional[str]) -> None:
pipeline = data.get("pipeline")
if not isinstance(pipeline, dict):
pipeline = {}
data["pipeline"] = pipeline
if source_signature:
pipeline["pending_prepare_signature"] = source_signature
pipeline["pending_prepare_updated_at"] = now_iso()
else:
pipeline.pop("pending_prepare_signature", None)
pipeline.pop("pending_prepare_updated_at", None)
def _clear_pending_prepare(slug: str) -> None:
path = lib_json(slug)
if not path.exists():
return
data = _read_json(path)
_set_pending_prepare_signature(data, None)
write_library(slug, data)
def _mark_all_files_ready(slug: str) -> None:
path = lib_json(slug)
if not path.exists():
return
data = _read_json(path)
stamp = now_iso()
changed = False
for entry in data.get("files", []):
if entry.get("sync_status") != "ready" or entry.get("synced_at") != stamp:
changed = True
entry["sync_status"] = "ready"
entry["synced_at"] = stamp
entry.pop("sync_error", None)
if changed:
write_library(slug, data)
def _mark_pending_files_failed(slug: str, error: Optional[str]) -> None:
path = lib_json(slug)
if not path.exists():
return
data = _read_json(path)
changed = False
for entry in data.get("files", []):
if entry.get("sync_status") == "ready":
continue
entry["sync_status"] = "failed"
if error:
entry["sync_error"] = error
else:
entry.pop("sync_error", None)
changed = True
if changed:
write_library(slug, data)
async def _ensure_prepare_job(slug: str) -> Optional[str]:
path = lib_json(slug)
if not path.exists():
return None
lock = LIB_LOCKS.setdefault(slug, asyncio.Lock())
async with lock:
if _has_active_job(slug):
return None
data = read_library(slug)
payload = library_payload(data)
pipeline = _pipeline_meta(data)
pending_signature = pipeline.get("pending_prepare_signature")
if not payload["states"].get("has_files") or not pending_signature:
return None
if payload["states"].get("is_indexed") and payload.get("prepare_signature") == pending_signature:
_clear_pending_prepare(slug)
return None
return _start_job(slug, "prepare")
async def _handle_post_job_state(slug: str, job_type: str, status: str) -> None:
path = lib_json(slug)
if not path.exists():
return
data = read_library(slug)
payload = library_payload(data)
pipeline = _pipeline_meta(data)
pending_signature = pipeline.get("pending_prepare_signature")
if not payload["states"].get("has_files"):
_clear_pending_prepare(slug)
_cleanup_generated_artifacts(slug)
return
if payload["states"].get("is_indexed") and (
pending_signature is None or payload.get("prepare_signature") == pending_signature
):
_mark_all_files_ready(slug)
_clear_pending_prepare(slug)
return
if status == "failed":
failed_job = _latest_library_job(slug, statuses={"failed"})
_mark_pending_files_failed(slug, failed_job.get("error") if failed_job else None)
return
if status == "succeeded" and pending_signature and not payload["states"].get("is_indexed"):
await _ensure_prepare_job(slug)
def _scaled_progress(on_progress, start: float, end: float, prefix: str):
def wrapped(phase: str, pct: float, detail: str):
clamped = min(1.0, max(0.0, float(pct)))
span = max(0.0, end - start)
message = f"{prefix}: {detail}" if detail else prefix
on_progress(phase, start + (span * clamped), message)
return wrapped
def _selected_enrichment_paths(files: List[Dict[str, Any]]) -> set[str]:
return {
str(entry.get("path") or "")
for entry in files
if _file_enrich_enabled(entry) and str(entry.get("path") or "").strip()
}
def _shadow_size_metrics(text: str) -> Dict[str, int]:
return {
"char_count": len(text),
"word_count": len(text.split()),
"line_count": len(text.splitlines()),
}
def _build_raw_shadow_record(rec: Dict[str, Any]) -> Dict[str, Any]:
synth_shadow_from_raw = _load_pipeline_fn("index_builder", "synth_shadow_from_raw")
shadow_text = synth_shadow_from_raw(rec)
return {
"id": rec.get("id"),
"parent_id": rec.get("parent_id"),
"source_path": rec.get("source_path"),
"url": rec.get("url"),
"title": rec.get("title"),
"record_type": rec.get("record_type"),
"mime": rec.get("mime"),
"lang": rec.get("lang"),
"span": rec.get("span") if isinstance(rec.get("span"), dict) else None,
"shadow_text": shadow_text,
"shadow_meta": {
"prompt_version": "raw-fallback",
"size": _shadow_size_metrics(shadow_text),
},
}
def _write_selected_corpus(corpus_path: Path, out_path: Path, selected_paths: set[str]) -> int:
if not selected_paths or not corpus_path.exists():
if out_path.exists():
out_path.unlink()
return 0
out_path.parent.mkdir(parents=True, exist_ok=True)
written = 0
with corpus_path.open("r", encoding="utf-8") as src, out_path.open("w", encoding="utf-8") as dst:
for line in src:
if not line.strip():
continue
try:
rec = json.loads(line)
except Exception:
continue
source_path = str(rec.get("source_path") or "")
if source_path not in selected_paths:
continue
dst.write(json.dumps(rec, ensure_ascii=False) + "\n")
written += 1
if written == 0 and out_path.exists():
out_path.unlink()
return written
def _build_merged_shadow_corpus(corpus_path: Path, partial_shadow_path: Path, out_path: Path) -> int:
if not corpus_path.exists():
if out_path.exists():
out_path.unlink()
return 0
selected_shadow: Dict[str, Dict[str, Any]] = {}
if partial_shadow_path.exists():
with partial_shadow_path.open("r", encoding="utf-8") as src:
for line in src:
if not line.strip():
continue
try:
rec = json.loads(line)
except Exception:
continue
rec_id = str(rec.get("id") or rec.get("record_id") or "")
if rec_id:
selected_shadow[rec_id] = rec
out_path.parent.mkdir(parents=True, exist_ok=True)
written = 0
with corpus_path.open("r", encoding="utf-8") as src, out_path.open("w", encoding="utf-8") as dst:
for line in src:
if not line.strip():
continue
try:
rec = json.loads(line)
except Exception:
continue
rec_id = str(rec.get("id") or rec.get("record_id") or "")
shadow_record = selected_shadow.get(rec_id) if rec_id else None
if shadow_record is None:
shadow_record = _build_raw_shadow_record(rec)
dst.write(json.dumps(shadow_record, ensure_ascii=False) + "\n")
written += 1
if written == 0 and out_path.exists():
out_path.unlink()
return written
def _run_selected_enrichment(slug: str, on_progress=None, **opts) -> Dict[str, Any]:
data = read_library(slug)
files = list(data.get("files", []))
selected_paths = _selected_enrichment_paths(files)
paths = _collect_library_paths(slug)
if not selected_paths:
_cleanup_enrichment_artifacts(slug)
if on_progress:
on_progress("enrich", 1.0, "Skipping enrichment. All files are raw-indexed.")
return {
"status": "skipped",
"selected_files": 0,
"selected_records": 0,
}
selected_records = _write_selected_corpus(paths["corpus"], paths["enrich_input"], selected_paths)
if selected_records == 0:
_cleanup_enrichment_artifacts(slug)
if on_progress:
on_progress("enrich", 1.0, "No selected records were eligible for enrichment.")
return {
"status": "skipped",
"selected_files": len(selected_paths),
"selected_records": 0,
}
enrich_runner = _load_pipeline_fn("corpus_enricher", "run_enrich")
result = enrich_runner(
inp=paths["enrich_input"],
out=paths["enhanced"],
shadow_out=paths["shadow_partial"],
on_progress=on_progress,
ollama=_resolve_ollama_url(opts.get("ollama")),
model=opts.get("enrich_model", DEFAULT_ENRICH_MODEL),
summary_lang=opts.get("summary_lang", "auto"),
concurrency=opts.get("enrich_concurrency", DEFAULT_ENRICH_CONCURRENCY),
min_chars=opts.get("min_chars", DEFAULT_ENRICH_MIN_CHARS),
max_text=opts.get("max_text", DEFAULT_ENRICH_MAX_TEXT),
cache_dir=opts.get("cache_dir", str(paths["base"] / ".rag_cache")),
)
result["selected_files"] = len(selected_paths)
result["selected_records"] = selected_records
result["shadow_records"] = _build_merged_shadow_corpus(paths["corpus"], paths["shadow_partial"], paths["shadow"])
return result
def _run_prepare_pipeline(slug: str, on_progress=None, **opts):
data = read_library(slug)
payload = library_payload(data)
pipeline = _pipeline_meta(data)
files = list(data.get("files", []))
source_signature = payload.get("source_signature")
corpus_signature = payload.get("corpus_signature")
prepare_signature = payload.get("prepare_signature")
if not files or not source_signature or not corpus_signature or not prepare_signature:
raise RuntimeError("Add files before preparing this database.")
paths = _collect_library_paths(slug)
states = dict(payload.get("states") or {})
results: Dict[str, Any] = {}
enrichment_enabled_files = int(states.get("enrichment_enabled_files") or 0)
build_runner = _load_pipeline_fn("corpus_builder", "run_build")
index_runner = _load_pipeline_fn("index_builder", "run_index")
embed_model = opts.get("embed_model") or _default_embed_model() or pipeline.get("embed_model") or DEFAULT_EMBED_MODEL
if on_progress:
on_progress("prepare", 0.01, "Preparing database for chat...")
if not states.get("has_corpus"):
build_progress = _scaled_progress(on_progress, 0.02, 0.34, "Reading files") if on_progress else None
results["build"] = build_runner(
root=stage_dir(slug),
out=paths["corpus"],
on_progress=build_progress,
emit="per-file",
lang_detect=False,
)
build_errors = list((results.get("build") or {}).get("errors") or [])
if not paths["corpus"].exists() or _line_count(paths["corpus"]) == 0:
if build_errors:
raise RuntimeError(build_errors[0])
raise RuntimeError("Corpus build produced no usable records.")
_mark_pipeline_stage(slug, "build", corpus_signature)
states["has_corpus"] = True
states["is_enriched"] = False
states["is_indexed"] = False
if enrichment_enabled_files > 0:
if not states.get("is_enriched"):
enrich_progress = _scaled_progress(on_progress, 0.34, 0.69, "Enriching selected files") if on_progress else None
results["enrich"] = _run_selected_enrichment(
slug,
on_progress=enrich_progress,
ollama=_resolve_ollama_url(opts.get("ollama")),
enrich_model=opts.get("enrich_model", DEFAULT_ENRICH_MODEL),
summary_lang=opts.get("summary_lang", "auto"),
enrich_concurrency=opts.get("enrich_concurrency", DEFAULT_ENRICH_CONCURRENCY),
min_chars=opts.get("min_chars", DEFAULT_ENRICH_MIN_CHARS),
max_text=opts.get("max_text", DEFAULT_ENRICH_MAX_TEXT),
)
_mark_pipeline_stage(slug, "enrich", prepare_signature)
states["is_enriched"] = True
states["is_indexed"] = False
else:
_cleanup_enrichment_artifacts(slug)
results["enrich"] = {
"status": "skipped",
"selected_files": 0,
"selected_records": 0,
}
if on_progress:
on_progress("enrich", 0.69, "Skipping enrichment. Files will stay raw-indexed.")
states["is_enriched"] = False
if not states.get("is_indexed"):
index_progress = _scaled_progress(on_progress, 0.69, 1.0, "Building search indexes") if on_progress else None
results["embed"] = index_runner(
raw=paths["corpus"],
enhanced=None,
shadow=paths["shadow"] if paths["shadow"].exists() else None,
out_dir=paths["indexes"],
on_progress=index_progress,
embed_model=embed_model,
ollama=_resolve_ollama_url(opts.get("ollama")),
target_chars=opts.get("target_chars", 2000),
overlap_chars=opts.get("overlap_chars", 200),
concurrency=opts.get("concurrency", 6),
)
_set_pipeline_embed_model(slug, results["embed"].get("embed_model") if isinstance(results.get("embed"), dict) else None)
_mark_pipeline_stage(slug, "embed", prepare_signature)
if on_progress:
on_progress("done", 1.0, "Database is ready for chat.")
return {
"status": "ok",
"results": results,
"source_signature": source_signature,
"embed_model": (results.get("embed") or {}).get("embed_model") if isinstance(results.get("embed"), dict) else None,
}
async def _run_job(job_id: str, fn_name: str, **kwargs):
loop = asyncio.get_running_loop()
job = JOBS[job_id]
stage_signature = kwargs.pop("stage_signature", None)
def on_progress(phase: str, pct: float, detail: str):
job["phase"] = phase
job["progress"] = round(float(pct) * 100.0, 1)
job["detail"] = detail
job["status"] = "running"
try:
if fn_name == "build":
runner = _load_pipeline_fn("corpus_builder", "run_build")
elif fn_name == "enrich":
runner = functools.partial(_run_selected_enrichment, job["slug"])
elif fn_name == "embed":
runner = _load_pipeline_fn("index_builder", "run_index")
elif fn_name == "prepare":
runner = functools.partial(_run_prepare_pipeline, job["slug"])
else:
raise RuntimeError(f"Unknown job type: {fn_name}")
call = functools.partial(runner, on_progress=on_progress, **kwargs)
result = await loop.run_in_executor(JOB_EXECUTOR, call)
if fn_name in {"embed", "prepare"} and isinstance(result, dict):
embed_model = result.get("embed_model")
if embed_model:
_set_pipeline_embed_model(job["slug"], embed_model)
if fn_name in {"build", "enrich", "embed"} and stage_signature:
_mark_pipeline_stage(job["slug"], fn_name, stage_signature)
job["status"] = "succeeded"
job["progress"] = 100.0
job["phase"] = "done"
job["detail"] = "Completed."
job["result"] = result
except Exception as exc:
job["status"] = "failed"
job["error"] = f"{type(exc).__name__}: {exc}"
finally:
job["finished_at"] = now_iso()
try:
await _handle_post_job_state(job["slug"], fn_name, job["status"])
except Exception:
pass
def _start_job(slug: str, job_type: str, **kwargs) -> str:
job_id = uuid.uuid4().hex
JOBS[job_id] = {
"id": job_id,
"slug": slug,
"type": job_type,
"status": "queued",
"phase": "queued",
"progress": 0.0,
"detail": "",
"created_at": now_iso(),
"finished_at": None,
"result": None,
"error": None,
}
asyncio.create_task(_run_job(job_id, job_type, **kwargs))
return job_id
def _build_local_context(prompt: str, results: Dict[str, Any], top_k: int = 5) -> Dict[str, Any]:
sources = results.get("sources") or []
selected = sources[: max(1, top_k)]
if not selected:
context_block = (
"<local_rag_context>\n"
"No useful results were found in the selected local knowledge base.\n"
"</local_rag_context>"
)
return {"context_block": context_block, "sources": []}
blocks: List[str] = ["<local_rag_context>"]
file_sources: List[str] = []
for idx, source in enumerate(selected, start=1):
title = (source.get("title") or Path(source.get("url") or source.get("doc_id") or f"Source {idx}").name).strip()
snippet = re.sub(r"\s+", " ", (source.get("snippet") or "")).strip()
if len(snippet) > 1400:
snippet = snippet[:1400].rstrip() + "..."
raw_path = source.get("url") or source.get("doc_id") or ""
if raw_path and os.path.isabs(raw_path):
file_sources.append(_file_uri(raw_path))
blocks.append(f"[L{idx}] {title}\n{snippet}")
blocks.append("</local_rag_context>")
blocks.append(
"Use the local knowledge base context when it is relevant. "
"If it does not answer the question, say so clearly instead of inventing details."
)
return {"context_block": "\n".join(blocks), "sources": file_sources}
@router.get("/libraries")
def list_libraries():
libraries: List[Dict[str, Any]] = []
for path in LIB_ROOT.iterdir():
if not path.is_dir():
continue
meta = path / "library.json"
if not meta.exists():
continue
try:
libraries.append(library_payload(_read_json(meta)))
except Exception:
continue
libraries.sort(key=lambda item: item.get("created_at", ""), reverse=True)
return {"libraries": libraries}
@router.post("/libraries")
def create_library(req: CreateLibraryRequest):
slug = slugify(req.name)
base_slug = slug
idx = 2
while lib_dir(slug).exists():
slug = f"{base_slug}-{idx}"
idx += 1
data = default_library_data(req.name, slug)
stage_dir(slug)
indexes_dir(slug)
write_library(slug, data)
return library_payload(data)
@router.post("/libraries/purge")
def purge_libraries():
active_jobs = [
job for job in JOBS.values()
if job["status"] in {"queued", "running"}
]
if active_jobs:
active_slugs = sorted({str(job.get("slug") or "") for job in active_jobs if job.get("slug")})
detail = "Cannot purge databases while library sync jobs are still running."
if active_slugs:
detail = f"{detail} Active databases: {', '.join(active_slugs)}."
raise HTTPException(status_code=409, detail=detail)
removed: List[str] = []
failures: List[str] = []
for path in list(LIB_ROOT.iterdir()):
if not path.is_dir():
continue
try:
shutil.rmtree(path)
removed.append(path.name)
except Exception as exc:
failures.append(f"{path.name}: {type(exc).__name__}: {exc}")
LIB_ROOT.mkdir(parents=True, exist_ok=True)
JOBS.clear()
LIB_LOCKS.clear()
if failures:
preview = "; ".join(failures[:3])
if len(failures) > 3:
preview = f"{preview}; ..."
raise HTTPException(status_code=500, detail=f"Failed to purge some databases. {preview}")
return {
"ok": True,
"count": len(removed),
"removed": removed,
}
@router.get("/libraries/{slug}")
def get_library(slug: str):
return library_payload(read_library(slug))
@router.patch("/libraries/{slug}")
def rename_library(slug: str, req: RenameLibraryRequest):
data = read_library(slug)
data["name"] = req.name.strip() or data["name"]
write_library(slug, data)
return library_payload(data)
@router.delete("/libraries/{slug}")
def delete_library(slug: str):
path = lib_dir(slug)
if not path.exists():
raise HTTPException(status_code=404, detail="Library not found")
shutil.rmtree(path)
return {"ok": True}
@router.post("/libraries/{slug}/files/register")
async def register_paths(slug: str, req: RegisterPathsRequest):
data = read_library(slug)
stage = stage_dir(slug)
existing = {entry.get("sha256"): entry for entry in data.get("files", [])}
added: List[Dict[str, Any]] = []
skipped: List[str] = []
for file_path in _walk_input_paths(req.paths):
sha = _sha256_file(file_path)
if sha in existing:
skipped.append(str(file_path))
continue
stage_name = _stage_name(sha, file_path)
symlink_path = stage / stage_name
if symlink_path.exists():
symlink_path.unlink()
symlink_path.symlink_to(file_path)
entry = {
"sha256": sha,
"path": str(file_path),
"rel": stage_name,
"name": file_path.name,
"size": file_path.stat().st_size,
"added_at": now_iso(),
"sync_status": "pending",
"enrich_enabled": False,
}
data.setdefault("files", []).append(entry)
added.append(entry)
existing[sha] = entry
job_id = None
if added:
_set_pending_prepare_signature(data, _prepare_signature(data.get("files", [])))
write_library(slug, data)
if added:
job_id = await _ensure_prepare_job(slug)
return {
"added": added,
"skipped": skipped,
"job_id": job_id,
"library": library_payload(data),
}
@router.delete("/libraries/{slug}/files")
async def remove_file(slug: str, req: RemoveFileRequest):
data = read_library(slug)
files = list(data.get("files", []))
removed = next((entry for entry in files if entry.get("rel") == req.rel), None)
if not removed:
raise HTTPException(status_code=404, detail="File not found")
data["files"] = [entry for entry in files if entry.get("rel") != req.rel]
symlink_path = stage_dir(slug) / req.rel
if symlink_path.exists():
symlink_path.unlink()
prepare_signature = _prepare_signature(data.get("files", []))
_set_pending_prepare_signature(data, prepare_signature)
write_library(slug, data)
job_id = None
if prepare_signature:
job_id = await _ensure_prepare_job(slug)
else:
_cleanup_generated_artifacts(slug)
return {"ok": True, "job_id": job_id, "library": library_payload(data)}
@router.patch("/libraries/{slug}/files/enrichment")
async def update_file_enrichment(slug: str, req: UpdateFileEnrichmentRequest):
data = read_library(slug)
files = list(data.get("files", []))
target = next((entry for entry in files if entry.get("rel") == req.rel), None)
if not target:
raise HTTPException(status_code=404, detail="File not found")
desired = bool(req.enabled)
if _file_enrich_enabled(target) == desired:
return {"job_id": None, "library": library_payload(data)}
target["enrich_enabled"] = desired
target["sync_status"] = "pending"
target.pop("sync_error", None)
target.pop("synced_at", None)
prepare_signature = _prepare_signature(files)
_set_pending_prepare_signature(data, prepare_signature)
write_library(slug, data)
job_id = await _ensure_prepare_job(slug)
return {"job_id": job_id, "library": library_payload(data)}
@router.post("/libraries/{slug}/jobs/build")
async def build_library(slug: str):
data = read_library(slug)
payload = library_payload(data)
if not payload["states"].get("has_files"):
raise HTTPException(status_code=400, detail="Add files before building a library.")
lock = LIB_LOCKS.setdefault(slug, asyncio.Lock())
async with lock:
if _has_active_job(slug):
raise HTTPException(status_code=409, detail="This library already has an active job.")
job_id = _start_job(
slug,
"build",
root=stage_dir(slug),
out=_collect_library_paths(slug)["corpus"],
emit="per-file",
lang_detect=False,
stage_signature=payload.get("corpus_signature"),
)
return {"job_id": job_id}
@router.post("/libraries/{slug}/jobs/enrich")
async def enrich_library(slug: str):
data = read_library(slug)
payload = library_payload(data)
if not payload["states"].get("has_corpus"):
raise HTTPException(status_code=400, detail="Build the corpus before enrichment.")
lock = LIB_LOCKS.setdefault(slug, asyncio.Lock())
async with lock:
if _has_active_job(slug):
raise HTTPException(status_code=409, detail="This library already has an active job.")
job_id = _start_job(slug, "enrich", stage_signature=payload.get("prepare_signature"))
return {"job_id": job_id}
@router.post("/libraries/{slug}/jobs/embed")
async def embed_library(slug: str, req: EmbedLibraryRequest):
data = read_library(slug)
payload = library_payload(data)
pipeline = _pipeline_meta(data)
paths = _collect_library_paths(slug)
if not payload["states"].get("has_corpus"):
raise HTTPException(status_code=400, detail="Build the corpus before indexing.")
embed_model = req.embed_model or _default_embed_model() or pipeline.get("embed_model") or DEFAULT_EMBED_MODEL
lock = LIB_LOCKS.setdefault(slug, asyncio.Lock())
async with lock:
if _has_active_job(slug):
raise HTTPException(status_code=409, detail="This library already has an active job.")
job_id = _start_job(
slug,
"embed",
raw=paths["corpus"],
enhanced=None,
shadow=paths["shadow"] if paths["shadow"].exists() else None,
out_dir=paths["indexes"],
embed_model=embed_model,
ollama=_resolve_ollama_url(req.ollama),
target_chars=req.target_chars,
overlap_chars=req.overlap_chars,
concurrency=req.concurrency,
stage_signature=payload.get("prepare_signature"),
)
return {"job_id": job_id}
@router.post("/libraries/{slug}/jobs/prepare")
async def prepare_library(slug: str):
data = read_library(slug)
payload = library_payload(data)
if not payload["states"].get("has_files"):
raise HTTPException(status_code=400, detail="Add files before preparing this database.")
lock = LIB_LOCKS.setdefault(slug, asyncio.Lock())
async with lock:
if _has_active_job(slug):
raise HTTPException(status_code=409, detail="This library already has an active job.")
job_id = _start_job(slug, "prepare")
return {"job_id": job_id}
@router.get("/jobs")
def list_jobs(slug: Optional[str] = None):
jobs = [_job_public(job) for job in JOBS.values() if slug is None or job["slug"] == slug]
jobs.sort(key=lambda item: item.get("created_at", ""), reverse=True)
return {"jobs": jobs}
@router.get("/jobs/{job_id}")
def get_job(job_id: str):
job = JOBS.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return _job_public(job)
@router.post("/libraries/{slug}/context")
def library_context(slug: str, req: LibraryContextRequest):
payload = library_payload(read_library(slug))
pipeline = payload.get("pipeline") or {}
paths = _collect_library_paths(slug)
if not payload["states"].get("is_indexed"):
raise HTTPException(status_code=400, detail="Prepare the library before using it in chat.")
embed_model = req.embed_model or pipeline.get("embed_model") or _default_embed_model() or DEFAULT_EMBED_MODEL
try:
run_query = _load_pipeline_fn("unified_rag", "run_query")
result = run_query(
shadow_index=paths["shadow_index"],
shadow_store=paths["shadow_store"],
content_index=paths["content_index"],
content_store=paths["content_store"],
query=req.prompt,
answer=False,
ollama=_resolve_ollama_url(req.ollama),
embed_model=embed_model,
gen_model=req.gen_model,
no_rerank=True,
k=max(1, req.top_k),
)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Local retrieval failed: {type(exc).__name__}: {exc}") from exc
context = _build_local_context(req.prompt, result, top_k=req.top_k)
return {
"context_block": context["context_block"],
"sources": context["sources"],
"result": result,
}