diff --git a/backend/local_rag.py b/backend/local_rag.py index 1b208cd..0fa65fd 100644 --- a/backend/local_rag.py +++ b/backend/local_rag.py @@ -99,6 +99,7 @@ def default_library_data(name: str, slug: str) -> Dict[str, Any]: "slug": slug, "created_at": now_iso(), "files": [], + "pipeline": {}, } @@ -132,6 +133,37 @@ def _file_uri(path_value: str) -> str: return f"file://{quote(path_value)}" +def _pipeline_meta(data: Dict[str, Any]) -> Dict[str, Any]: + pipeline = data.get("pipeline") + if isinstance(pipeline, dict): + return pipeline + return {} + + +def _source_signature(files: List[Dict[str, Any]]) -> Optional[str]: + if not files: + return None + digest = hashlib.sha256() + ordered = sorted( + files, + key=lambda entry: ( + str(entry.get("sha256") or ""), + str(entry.get("path") or ""), + str(entry.get("rel") or ""), + ), + ) + for entry in ordered: + payload = { + "sha256": entry.get("sha256") or "", + "path": entry.get("path") or "", + "rel": entry.get("rel") or "", + "size": int(entry.get("size") or 0), + } + digest.update(json.dumps(payload, sort_keys=True).encode("utf-8")) + digest.update(b"\n") + return digest.hexdigest() + + def _collect_library_paths(slug: str) -> Dict[str, Path]: base = lib_dir(slug) return { @@ -151,20 +183,41 @@ def _collect_library_paths(slug: str) -> Dict[str, Path]: def library_payload(data: Dict[str, Any]) -> Dict[str, Any]: paths = _collect_library_paths(data["slug"]) files = list(data.get("files", [])) + pipeline = _pipeline_meta(data) + source_signature = _source_signature(files) + has_corpus = bool(source_signature) and pipeline.get("corpus_signature") == source_signature and paths["corpus"].exists() + is_enriched = ( + bool(source_signature) + and pipeline.get("enriched_signature") == source_signature + and paths["enhanced"].exists() + and paths["shadow"].exists() + ) + is_indexed = ( + bool(source_signature) + and pipeline.get("indexed_signature") == source_signature + and paths["shadow_index"].exists() + and paths["shadow_store"].exists() + and paths["content_index"].exists() + and paths["content_store"].exists() + ) stages = { "has_files": len(files) > 0, - "has_corpus": paths["corpus"].exists(), - "is_enriched": paths["enhanced"].exists() and paths["shadow"].exists(), - "is_indexed": paths["shadow_index"].exists() and paths["content_index"].exists(), + "has_corpus": has_corpus, + "is_enriched": is_enriched, + "is_indexed": is_indexed, + "is_ready_for_chat": is_indexed, + "needs_prepare": bool(files) and not is_indexed, } artifacts = { - "corpus_records": _line_count(paths["corpus"]), - "enhanced_records": _line_count(paths["enhanced"]), - "shadow_records": _line_count(paths["shadow"]), + "corpus_records": _line_count(paths["corpus"]) if has_corpus else 0, + "enhanced_records": _line_count(paths["enhanced"]) if is_enriched else 0, + "shadow_records": _line_count(paths["shadow"]) if is_enriched else 0, } return { **data, "files": files, + "pipeline": pipeline, + "source_signature": source_signature, "states": stages, "artifacts": artifacts, } @@ -229,9 +282,122 @@ def _load_pipeline_fn(module_name: str, attr: str): return getattr(module, attr) +def _mark_pipeline_stage(slug: str, stage: str, source_signature: Optional[str]) -> None: + path = lib_json(slug) + if not path.exists(): + return + + data = _read_json(path) + pipeline = data.get("pipeline") + if not isinstance(pipeline, dict): + pipeline = {} + data["pipeline"] = pipeline + + stamp = now_iso() + if stage == "build": + pipeline["corpus_signature"] = source_signature + pipeline["corpus_updated_at"] = stamp + pipeline.pop("enriched_signature", None) + pipeline.pop("enriched_updated_at", None) + pipeline.pop("indexed_signature", None) + pipeline.pop("indexed_updated_at", None) + elif stage == "enrich": + pipeline["enriched_signature"] = source_signature + pipeline["enriched_updated_at"] = stamp + pipeline.pop("indexed_signature", None) + pipeline.pop("indexed_updated_at", None) + elif stage == "embed": + pipeline["indexed_signature"] = source_signature + pipeline["indexed_updated_at"] = stamp + else: + raise ValueError(f"Unknown pipeline stage: {stage}") + + write_library(slug, data) + + +def _scaled_progress(on_progress, start: float, end: float, prefix: str): + def wrapped(phase: str, pct: float, detail: str): + clamped = min(1.0, max(0.0, float(pct))) + span = max(0.0, end - start) + message = f"{prefix}: {detail}" if detail else prefix + on_progress(phase, start + (span * clamped), message) + + return wrapped + + +def _run_prepare_pipeline(slug: str, on_progress=None, **opts): + data = read_library(slug) + payload = library_payload(data) + files = list(data.get("files", [])) + source_signature = payload.get("source_signature") + if not files or not source_signature: + raise RuntimeError("Add files before preparing this database.") + + paths = _collect_library_paths(slug) + states = dict(payload.get("states") or {}) + results: Dict[str, Any] = {} + + build_runner = _load_pipeline_fn("corpus_builder", "run_build") + enrich_runner = _load_pipeline_fn("corpus_enricher", "run_enrich") + index_runner = _load_pipeline_fn("index_builder", "run_index") + + if on_progress: + on_progress("prepare", 0.01, "Preparing database for chat...") + + if not states.get("has_corpus"): + build_progress = _scaled_progress(on_progress, 0.02, 0.34, "Reading files") if on_progress else None + results["build"] = build_runner( + root=stage_dir(slug), + out=paths["corpus"], + on_progress=build_progress, + ) + _mark_pipeline_stage(slug, "build", source_signature) + states["has_corpus"] = True + states["is_enriched"] = False + states["is_indexed"] = False + + if not states.get("is_enriched"): + enrich_progress = _scaled_progress(on_progress, 0.34, 0.69, "Enriching content") if on_progress else None + results["enrich"] = enrich_runner( + inp=paths["corpus"], + out=paths["enhanced"], + shadow_out=paths["shadow"], + on_progress=enrich_progress, + ) + _mark_pipeline_stage(slug, "enrich", source_signature) + states["is_enriched"] = True + states["is_indexed"] = False + + if not states.get("is_indexed"): + index_progress = _scaled_progress(on_progress, 0.69, 1.0, "Building search indexes") if on_progress else None + results["embed"] = index_runner( + raw=paths["corpus"], + enhanced=paths["enhanced"] if states.get("is_enriched") and paths["enhanced"].exists() else None, + shadow=paths["shadow"] if states.get("is_enriched") and paths["shadow"].exists() else None, + out_dir=paths["indexes"], + on_progress=index_progress, + embed_model=opts.get("embed_model", "dengcao/Qwen3-Embedding-0.6B:F16"), + ollama=opts.get("ollama", "http://localhost:11434"), + target_chars=opts.get("target_chars", 2000), + overlap_chars=opts.get("overlap_chars", 200), + concurrency=opts.get("concurrency", 6), + ) + _mark_pipeline_stage(slug, "embed", source_signature) + + if on_progress: + on_progress("done", 1.0, "Database is ready for chat.") + + return { + "status": "ok", + "results": results, + "source_signature": source_signature, + } + + async def _run_job(job_id: str, fn_name: str, **kwargs): loop = asyncio.get_running_loop() job = JOBS[job_id] + source_signature = kwargs.pop("source_signature", None) def on_progress(phase: str, pct: float, detail: str): job["phase"] = phase @@ -246,11 +412,15 @@ async def _run_job(job_id: str, fn_name: str, **kwargs): runner = _load_pipeline_fn("corpus_enricher", "run_enrich") elif fn_name == "embed": runner = _load_pipeline_fn("index_builder", "run_index") + elif fn_name == "prepare": + runner = functools.partial(_run_prepare_pipeline, job["slug"]) else: raise RuntimeError(f"Unknown job type: {fn_name}") call = functools.partial(runner, on_progress=on_progress, **kwargs) result = await loop.run_in_executor(JOB_EXECUTOR, call) + if fn_name in {"build", "enrich", "embed"} and source_signature: + _mark_pipeline_stage(job["slug"], fn_name, source_signature) job["status"] = "succeeded" job["progress"] = 100.0 job["phase"] = "done" @@ -421,7 +591,8 @@ def remove_file(slug: str, req: RemoveFileRequest): @router.post("/libraries/{slug}/jobs/build") async def build_library(slug: str): data = read_library(slug) - if not data.get("files"): + payload = library_payload(data) + if not payload["states"].get("has_files"): raise HTTPException(status_code=400, detail="Add files before building a library.") lock = LIB_LOCKS.setdefault(slug, asyncio.Lock()) async with lock: @@ -432,14 +603,17 @@ async def build_library(slug: str): "build", root=stage_dir(slug), out=_collect_library_paths(slug)["corpus"], + source_signature=payload.get("source_signature"), ) return {"job_id": job_id} @router.post("/libraries/{slug}/jobs/enrich") async def enrich_library(slug: str): + data = read_library(slug) + payload = library_payload(data) paths = _collect_library_paths(slug) - if not paths["corpus"].exists(): + if not payload["states"].get("has_corpus"): raise HTTPException(status_code=400, detail="Build the corpus before enrichment.") lock = LIB_LOCKS.setdefault(slug, asyncio.Lock()) async with lock: @@ -451,14 +625,17 @@ async def enrich_library(slug: str): inp=paths["corpus"], out=paths["enhanced"], shadow_out=paths["shadow"], + source_signature=payload.get("source_signature"), ) return {"job_id": job_id} @router.post("/libraries/{slug}/jobs/embed") async def embed_library(slug: str, req: EmbedLibraryRequest): + data = read_library(slug) + payload = library_payload(data) paths = _collect_library_paths(slug) - if not paths["corpus"].exists(): + if not payload["states"].get("has_corpus"): raise HTTPException(status_code=400, detail="Build the corpus before indexing.") lock = LIB_LOCKS.setdefault(slug, asyncio.Lock()) async with lock: @@ -468,18 +645,33 @@ async def embed_library(slug: str, req: EmbedLibraryRequest): slug, "embed", raw=paths["corpus"], - enhanced=paths["enhanced"] if paths["enhanced"].exists() else None, - shadow=paths["shadow"] if paths["shadow"].exists() else None, + enhanced=paths["enhanced"] if payload["states"].get("is_enriched") and paths["enhanced"].exists() else None, + shadow=paths["shadow"] if payload["states"].get("is_enriched") and paths["shadow"].exists() else None, out_dir=paths["indexes"], embed_model=req.embed_model, ollama=req.ollama, target_chars=req.target_chars, overlap_chars=req.overlap_chars, concurrency=req.concurrency, + source_signature=payload.get("source_signature"), ) return {"job_id": job_id} +@router.post("/libraries/{slug}/jobs/prepare") +async def prepare_library(slug: str): + data = read_library(slug) + payload = library_payload(data) + if not payload["states"].get("has_files"): + raise HTTPException(status_code=400, detail="Add files before preparing this database.") + lock = LIB_LOCKS.setdefault(slug, asyncio.Lock()) + async with lock: + if _has_active_job(slug): + raise HTTPException(status_code=409, detail="This library already has an active job.") + job_id = _start_job(slug, "prepare") + return {"job_id": job_id} + + @router.get("/jobs") def list_jobs(slug: Optional[str] = None): jobs = [_job_public(job) for job in JOBS.values() if slug is None or job["slug"] == slug] @@ -497,9 +689,10 @@ def get_job(job_id: str): @router.post("/libraries/{slug}/context") def library_context(slug: str, req: LibraryContextRequest): + payload = library_payload(read_library(slug)) paths = _collect_library_paths(slug) - if not paths["shadow_index"].exists() or not paths["content_index"].exists(): - raise HTTPException(status_code=400, detail="Index the library before using it in chat.") + if not payload["states"].get("is_indexed"): + raise HTTPException(status_code=400, detail="Prepare the library before using it in chat.") try: run_query = _load_pipeline_fn("unified_rag", "run_query") result = run_query(