Add pipeline metadata and prepare functionality to local_rag.py

This commit is contained in:
2026-03-19 21:22:52 +01:00
parent 4c2ff3c875
commit c5d1510423

View File

@@ -99,6 +99,7 @@ def default_library_data(name: str, slug: str) -> Dict[str, Any]:
"slug": slug,
"created_at": now_iso(),
"files": [],
"pipeline": {},
}
@@ -132,6 +133,37 @@ def _file_uri(path_value: str) -> str:
return f"file://{quote(path_value)}"
def _pipeline_meta(data: Dict[str, Any]) -> Dict[str, Any]:
pipeline = data.get("pipeline")
if isinstance(pipeline, dict):
return pipeline
return {}
def _source_signature(files: List[Dict[str, Any]]) -> Optional[str]:
if not files:
return None
digest = hashlib.sha256()
ordered = sorted(
files,
key=lambda entry: (
str(entry.get("sha256") or ""),
str(entry.get("path") or ""),
str(entry.get("rel") or ""),
),
)
for entry in ordered:
payload = {
"sha256": entry.get("sha256") or "",
"path": entry.get("path") or "",
"rel": entry.get("rel") or "",
"size": int(entry.get("size") or 0),
}
digest.update(json.dumps(payload, sort_keys=True).encode("utf-8"))
digest.update(b"\n")
return digest.hexdigest()
def _collect_library_paths(slug: str) -> Dict[str, Path]:
base = lib_dir(slug)
return {
@@ -151,20 +183,41 @@ def _collect_library_paths(slug: str) -> Dict[str, Path]:
def library_payload(data: Dict[str, Any]) -> Dict[str, Any]:
paths = _collect_library_paths(data["slug"])
files = list(data.get("files", []))
pipeline = _pipeline_meta(data)
source_signature = _source_signature(files)
has_corpus = bool(source_signature) and pipeline.get("corpus_signature") == source_signature and paths["corpus"].exists()
is_enriched = (
bool(source_signature)
and pipeline.get("enriched_signature") == source_signature
and paths["enhanced"].exists()
and paths["shadow"].exists()
)
is_indexed = (
bool(source_signature)
and pipeline.get("indexed_signature") == source_signature
and paths["shadow_index"].exists()
and paths["shadow_store"].exists()
and paths["content_index"].exists()
and paths["content_store"].exists()
)
stages = {
"has_files": len(files) > 0,
"has_corpus": paths["corpus"].exists(),
"is_enriched": paths["enhanced"].exists() and paths["shadow"].exists(),
"is_indexed": paths["shadow_index"].exists() and paths["content_index"].exists(),
"has_corpus": has_corpus,
"is_enriched": is_enriched,
"is_indexed": is_indexed,
"is_ready_for_chat": is_indexed,
"needs_prepare": bool(files) and not is_indexed,
}
artifacts = {
"corpus_records": _line_count(paths["corpus"]),
"enhanced_records": _line_count(paths["enhanced"]),
"shadow_records": _line_count(paths["shadow"]),
"corpus_records": _line_count(paths["corpus"]) if has_corpus else 0,
"enhanced_records": _line_count(paths["enhanced"]) if is_enriched else 0,
"shadow_records": _line_count(paths["shadow"]) if is_enriched else 0,
}
return {
**data,
"files": files,
"pipeline": pipeline,
"source_signature": source_signature,
"states": stages,
"artifacts": artifacts,
}
@@ -229,9 +282,122 @@ def _load_pipeline_fn(module_name: str, attr: str):
return getattr(module, attr)
def _mark_pipeline_stage(slug: str, stage: str, source_signature: Optional[str]) -> None:
path = lib_json(slug)
if not path.exists():
return
data = _read_json(path)
pipeline = data.get("pipeline")
if not isinstance(pipeline, dict):
pipeline = {}
data["pipeline"] = pipeline
stamp = now_iso()
if stage == "build":
pipeline["corpus_signature"] = source_signature
pipeline["corpus_updated_at"] = stamp
pipeline.pop("enriched_signature", None)
pipeline.pop("enriched_updated_at", None)
pipeline.pop("indexed_signature", None)
pipeline.pop("indexed_updated_at", None)
elif stage == "enrich":
pipeline["enriched_signature"] = source_signature
pipeline["enriched_updated_at"] = stamp
pipeline.pop("indexed_signature", None)
pipeline.pop("indexed_updated_at", None)
elif stage == "embed":
pipeline["indexed_signature"] = source_signature
pipeline["indexed_updated_at"] = stamp
else:
raise ValueError(f"Unknown pipeline stage: {stage}")
write_library(slug, data)
def _scaled_progress(on_progress, start: float, end: float, prefix: str):
def wrapped(phase: str, pct: float, detail: str):
clamped = min(1.0, max(0.0, float(pct)))
span = max(0.0, end - start)
message = f"{prefix}: {detail}" if detail else prefix
on_progress(phase, start + (span * clamped), message)
return wrapped
def _run_prepare_pipeline(slug: str, on_progress=None, **opts):
data = read_library(slug)
payload = library_payload(data)
files = list(data.get("files", []))
source_signature = payload.get("source_signature")
if not files or not source_signature:
raise RuntimeError("Add files before preparing this database.")
paths = _collect_library_paths(slug)
states = dict(payload.get("states") or {})
results: Dict[str, Any] = {}
build_runner = _load_pipeline_fn("corpus_builder", "run_build")
enrich_runner = _load_pipeline_fn("corpus_enricher", "run_enrich")
index_runner = _load_pipeline_fn("index_builder", "run_index")
if on_progress:
on_progress("prepare", 0.01, "Preparing database for chat...")
if not states.get("has_corpus"):
build_progress = _scaled_progress(on_progress, 0.02, 0.34, "Reading files") if on_progress else None
results["build"] = build_runner(
root=stage_dir(slug),
out=paths["corpus"],
on_progress=build_progress,
)
_mark_pipeline_stage(slug, "build", source_signature)
states["has_corpus"] = True
states["is_enriched"] = False
states["is_indexed"] = False
if not states.get("is_enriched"):
enrich_progress = _scaled_progress(on_progress, 0.34, 0.69, "Enriching content") if on_progress else None
results["enrich"] = enrich_runner(
inp=paths["corpus"],
out=paths["enhanced"],
shadow_out=paths["shadow"],
on_progress=enrich_progress,
)
_mark_pipeline_stage(slug, "enrich", source_signature)
states["is_enriched"] = True
states["is_indexed"] = False
if not states.get("is_indexed"):
index_progress = _scaled_progress(on_progress, 0.69, 1.0, "Building search indexes") if on_progress else None
results["embed"] = index_runner(
raw=paths["corpus"],
enhanced=paths["enhanced"] if states.get("is_enriched") and paths["enhanced"].exists() else None,
shadow=paths["shadow"] if states.get("is_enriched") and paths["shadow"].exists() else None,
out_dir=paths["indexes"],
on_progress=index_progress,
embed_model=opts.get("embed_model", "dengcao/Qwen3-Embedding-0.6B:F16"),
ollama=opts.get("ollama", "http://localhost:11434"),
target_chars=opts.get("target_chars", 2000),
overlap_chars=opts.get("overlap_chars", 200),
concurrency=opts.get("concurrency", 6),
)
_mark_pipeline_stage(slug, "embed", source_signature)
if on_progress:
on_progress("done", 1.0, "Database is ready for chat.")
return {
"status": "ok",
"results": results,
"source_signature": source_signature,
}
async def _run_job(job_id: str, fn_name: str, **kwargs):
loop = asyncio.get_running_loop()
job = JOBS[job_id]
source_signature = kwargs.pop("source_signature", None)
def on_progress(phase: str, pct: float, detail: str):
job["phase"] = phase
@@ -246,11 +412,15 @@ async def _run_job(job_id: str, fn_name: str, **kwargs):
runner = _load_pipeline_fn("corpus_enricher", "run_enrich")
elif fn_name == "embed":
runner = _load_pipeline_fn("index_builder", "run_index")
elif fn_name == "prepare":
runner = functools.partial(_run_prepare_pipeline, job["slug"])
else:
raise RuntimeError(f"Unknown job type: {fn_name}")
call = functools.partial(runner, on_progress=on_progress, **kwargs)
result = await loop.run_in_executor(JOB_EXECUTOR, call)
if fn_name in {"build", "enrich", "embed"} and source_signature:
_mark_pipeline_stage(job["slug"], fn_name, source_signature)
job["status"] = "succeeded"
job["progress"] = 100.0
job["phase"] = "done"
@@ -421,7 +591,8 @@ def remove_file(slug: str, req: RemoveFileRequest):
@router.post("/libraries/{slug}/jobs/build")
async def build_library(slug: str):
data = read_library(slug)
if not data.get("files"):
payload = library_payload(data)
if not payload["states"].get("has_files"):
raise HTTPException(status_code=400, detail="Add files before building a library.")
lock = LIB_LOCKS.setdefault(slug, asyncio.Lock())
async with lock:
@@ -432,14 +603,17 @@ async def build_library(slug: str):
"build",
root=stage_dir(slug),
out=_collect_library_paths(slug)["corpus"],
source_signature=payload.get("source_signature"),
)
return {"job_id": job_id}
@router.post("/libraries/{slug}/jobs/enrich")
async def enrich_library(slug: str):
data = read_library(slug)
payload = library_payload(data)
paths = _collect_library_paths(slug)
if not paths["corpus"].exists():
if not payload["states"].get("has_corpus"):
raise HTTPException(status_code=400, detail="Build the corpus before enrichment.")
lock = LIB_LOCKS.setdefault(slug, asyncio.Lock())
async with lock:
@@ -451,14 +625,17 @@ async def enrich_library(slug: str):
inp=paths["corpus"],
out=paths["enhanced"],
shadow_out=paths["shadow"],
source_signature=payload.get("source_signature"),
)
return {"job_id": job_id}
@router.post("/libraries/{slug}/jobs/embed")
async def embed_library(slug: str, req: EmbedLibraryRequest):
data = read_library(slug)
payload = library_payload(data)
paths = _collect_library_paths(slug)
if not paths["corpus"].exists():
if not payload["states"].get("has_corpus"):
raise HTTPException(status_code=400, detail="Build the corpus before indexing.")
lock = LIB_LOCKS.setdefault(slug, asyncio.Lock())
async with lock:
@@ -468,18 +645,33 @@ async def embed_library(slug: str, req: EmbedLibraryRequest):
slug,
"embed",
raw=paths["corpus"],
enhanced=paths["enhanced"] if paths["enhanced"].exists() else None,
shadow=paths["shadow"] if paths["shadow"].exists() else None,
enhanced=paths["enhanced"] if payload["states"].get("is_enriched") and paths["enhanced"].exists() else None,
shadow=paths["shadow"] if payload["states"].get("is_enriched") and paths["shadow"].exists() else None,
out_dir=paths["indexes"],
embed_model=req.embed_model,
ollama=req.ollama,
target_chars=req.target_chars,
overlap_chars=req.overlap_chars,
concurrency=req.concurrency,
source_signature=payload.get("source_signature"),
)
return {"job_id": job_id}
@router.post("/libraries/{slug}/jobs/prepare")
async def prepare_library(slug: str):
data = read_library(slug)
payload = library_payload(data)
if not payload["states"].get("has_files"):
raise HTTPException(status_code=400, detail="Add files before preparing this database.")
lock = LIB_LOCKS.setdefault(slug, asyncio.Lock())
async with lock:
if _has_active_job(slug):
raise HTTPException(status_code=409, detail="This library already has an active job.")
job_id = _start_job(slug, "prepare")
return {"job_id": job_id}
@router.get("/jobs")
def list_jobs(slug: Optional[str] = None):
jobs = [_job_public(job) for job in JOBS.values() if slug is None or job["slug"] == slug]
@@ -497,9 +689,10 @@ def get_job(job_id: str):
@router.post("/libraries/{slug}/context")
def library_context(slug: str, req: LibraryContextRequest):
payload = library_payload(read_library(slug))
paths = _collect_library_paths(slug)
if not paths["shadow_index"].exists() or not paths["content_index"].exists():
raise HTTPException(status_code=400, detail="Index the library before using it in chat.")
if not payload["states"].get("is_indexed"):
raise HTTPException(status_code=400, detail="Prepare the library before using it in chat.")
try:
run_query = _load_pipeline_fn("unified_rag", "run_query")
result = run_query(