diff --git a/backend/local_rag.py b/backend/local_rag.py index 437d32f..b8bfddf 100644 --- a/backend/local_rag.py +++ b/backend/local_rag.py @@ -588,6 +588,159 @@ def _scaled_progress(on_progress, start: float, end: float, prefix: str): return wrapped +def _selected_enrichment_paths(files: List[Dict[str, Any]]) -> set[str]: + return { + str(entry.get("path") or "") + for entry in files + if _file_enrich_enabled(entry) and str(entry.get("path") or "").strip() + } + + +def _shadow_size_metrics(text: str) -> Dict[str, int]: + return { + "char_count": len(text), + "word_count": len(text.split()), + "line_count": len(text.splitlines()), + } + + +def _build_raw_shadow_record(rec: Dict[str, Any]) -> Dict[str, Any]: + synth_shadow_from_raw = _load_pipeline_fn("index_builder", "synth_shadow_from_raw") + shadow_text = synth_shadow_from_raw(rec) + return { + "id": rec.get("id"), + "parent_id": rec.get("parent_id"), + "source_path": rec.get("source_path"), + "url": rec.get("url"), + "title": rec.get("title"), + "record_type": rec.get("record_type"), + "mime": rec.get("mime"), + "lang": rec.get("lang"), + "span": rec.get("span") if isinstance(rec.get("span"), dict) else None, + "shadow_text": shadow_text, + "shadow_meta": { + "prompt_version": "raw-fallback", + "size": _shadow_size_metrics(shadow_text), + }, + } + + +def _write_selected_corpus(corpus_path: Path, out_path: Path, selected_paths: set[str]) -> int: + if not selected_paths or not corpus_path.exists(): + if out_path.exists(): + out_path.unlink() + return 0 + + out_path.parent.mkdir(parents=True, exist_ok=True) + written = 0 + with corpus_path.open("r", encoding="utf-8") as src, out_path.open("w", encoding="utf-8") as dst: + for line in src: + if not line.strip(): + continue + try: + rec = json.loads(line) + except Exception: + continue + source_path = str(rec.get("source_path") or "") + if source_path not in selected_paths: + continue + dst.write(json.dumps(rec, ensure_ascii=False) + "\n") + written += 1 + + if written == 0 and out_path.exists(): + out_path.unlink() + return written + + +def _build_merged_shadow_corpus(corpus_path: Path, partial_shadow_path: Path, out_path: Path) -> int: + if not corpus_path.exists(): + if out_path.exists(): + out_path.unlink() + return 0 + + selected_shadow: Dict[str, Dict[str, Any]] = {} + if partial_shadow_path.exists(): + with partial_shadow_path.open("r", encoding="utf-8") as src: + for line in src: + if not line.strip(): + continue + try: + rec = json.loads(line) + except Exception: + continue + rec_id = str(rec.get("id") or rec.get("record_id") or "") + if rec_id: + selected_shadow[rec_id] = rec + + out_path.parent.mkdir(parents=True, exist_ok=True) + written = 0 + with corpus_path.open("r", encoding="utf-8") as src, out_path.open("w", encoding="utf-8") as dst: + for line in src: + if not line.strip(): + continue + try: + rec = json.loads(line) + except Exception: + continue + rec_id = str(rec.get("id") or rec.get("record_id") or "") + shadow_record = selected_shadow.get(rec_id) if rec_id else None + if shadow_record is None: + shadow_record = _build_raw_shadow_record(rec) + dst.write(json.dumps(shadow_record, ensure_ascii=False) + "\n") + written += 1 + + if written == 0 and out_path.exists(): + out_path.unlink() + return written + + +def _run_selected_enrichment(slug: str, on_progress=None, **opts) -> Dict[str, Any]: + data = read_library(slug) + files = list(data.get("files", [])) + selected_paths = _selected_enrichment_paths(files) + paths = _collect_library_paths(slug) + + if not selected_paths: + _cleanup_enrichment_artifacts(slug) + if on_progress: + on_progress("enrich", 1.0, "Skipping enrichment. All files are raw-indexed.") + return { + "status": "skipped", + "selected_files": 0, + "selected_records": 0, + } + + selected_records = _write_selected_corpus(paths["corpus"], paths["enrich_input"], selected_paths) + if selected_records == 0: + _cleanup_enrichment_artifacts(slug) + if on_progress: + on_progress("enrich", 1.0, "No selected records were eligible for enrichment.") + return { + "status": "skipped", + "selected_files": len(selected_paths), + "selected_records": 0, + } + + enrich_runner = _load_pipeline_fn("corpus_enricher", "run_enrich") + result = enrich_runner( + inp=paths["enrich_input"], + out=paths["enhanced"], + shadow_out=paths["shadow_partial"], + on_progress=on_progress, + ollama=opts.get("ollama", "http://localhost:11434"), + model=opts.get("enrich_model", DEFAULT_ENRICH_MODEL), + summary_lang=opts.get("summary_lang", "auto"), + concurrency=opts.get("enrich_concurrency", DEFAULT_ENRICH_CONCURRENCY), + min_chars=opts.get("min_chars", DEFAULT_ENRICH_MIN_CHARS), + max_text=opts.get("max_text", DEFAULT_ENRICH_MAX_TEXT), + cache_dir=opts.get("cache_dir", str(paths["base"] / ".rag_cache")), + ) + result["selected_files"] = len(selected_paths) + result["selected_records"] = selected_records + result["shadow_records"] = _build_merged_shadow_corpus(paths["corpus"], paths["shadow_partial"], paths["shadow"]) + return result + + def _run_prepare_pipeline(slug: str, on_progress=None, **opts): data = read_library(slug) payload = library_payload(data)