Add functions for enrichment and shadow corpus building
This commit is contained in:
@@ -588,6 +588,159 @@ def _scaled_progress(on_progress, start: float, end: float, prefix: str):
|
||||
return wrapped
|
||||
|
||||
|
||||
def _selected_enrichment_paths(files: List[Dict[str, Any]]) -> set[str]:
|
||||
return {
|
||||
str(entry.get("path") or "")
|
||||
for entry in files
|
||||
if _file_enrich_enabled(entry) and str(entry.get("path") or "").strip()
|
||||
}
|
||||
|
||||
|
||||
def _shadow_size_metrics(text: str) -> Dict[str, int]:
|
||||
return {
|
||||
"char_count": len(text),
|
||||
"word_count": len(text.split()),
|
||||
"line_count": len(text.splitlines()),
|
||||
}
|
||||
|
||||
|
||||
def _build_raw_shadow_record(rec: Dict[str, Any]) -> Dict[str, Any]:
|
||||
synth_shadow_from_raw = _load_pipeline_fn("index_builder", "synth_shadow_from_raw")
|
||||
shadow_text = synth_shadow_from_raw(rec)
|
||||
return {
|
||||
"id": rec.get("id"),
|
||||
"parent_id": rec.get("parent_id"),
|
||||
"source_path": rec.get("source_path"),
|
||||
"url": rec.get("url"),
|
||||
"title": rec.get("title"),
|
||||
"record_type": rec.get("record_type"),
|
||||
"mime": rec.get("mime"),
|
||||
"lang": rec.get("lang"),
|
||||
"span": rec.get("span") if isinstance(rec.get("span"), dict) else None,
|
||||
"shadow_text": shadow_text,
|
||||
"shadow_meta": {
|
||||
"prompt_version": "raw-fallback",
|
||||
"size": _shadow_size_metrics(shadow_text),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _write_selected_corpus(corpus_path: Path, out_path: Path, selected_paths: set[str]) -> int:
|
||||
if not selected_paths or not corpus_path.exists():
|
||||
if out_path.exists():
|
||||
out_path.unlink()
|
||||
return 0
|
||||
|
||||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
written = 0
|
||||
with corpus_path.open("r", encoding="utf-8") as src, out_path.open("w", encoding="utf-8") as dst:
|
||||
for line in src:
|
||||
if not line.strip():
|
||||
continue
|
||||
try:
|
||||
rec = json.loads(line)
|
||||
except Exception:
|
||||
continue
|
||||
source_path = str(rec.get("source_path") or "")
|
||||
if source_path not in selected_paths:
|
||||
continue
|
||||
dst.write(json.dumps(rec, ensure_ascii=False) + "\n")
|
||||
written += 1
|
||||
|
||||
if written == 0 and out_path.exists():
|
||||
out_path.unlink()
|
||||
return written
|
||||
|
||||
|
||||
def _build_merged_shadow_corpus(corpus_path: Path, partial_shadow_path: Path, out_path: Path) -> int:
|
||||
if not corpus_path.exists():
|
||||
if out_path.exists():
|
||||
out_path.unlink()
|
||||
return 0
|
||||
|
||||
selected_shadow: Dict[str, Dict[str, Any]] = {}
|
||||
if partial_shadow_path.exists():
|
||||
with partial_shadow_path.open("r", encoding="utf-8") as src:
|
||||
for line in src:
|
||||
if not line.strip():
|
||||
continue
|
||||
try:
|
||||
rec = json.loads(line)
|
||||
except Exception:
|
||||
continue
|
||||
rec_id = str(rec.get("id") or rec.get("record_id") or "")
|
||||
if rec_id:
|
||||
selected_shadow[rec_id] = rec
|
||||
|
||||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
written = 0
|
||||
with corpus_path.open("r", encoding="utf-8") as src, out_path.open("w", encoding="utf-8") as dst:
|
||||
for line in src:
|
||||
if not line.strip():
|
||||
continue
|
||||
try:
|
||||
rec = json.loads(line)
|
||||
except Exception:
|
||||
continue
|
||||
rec_id = str(rec.get("id") or rec.get("record_id") or "")
|
||||
shadow_record = selected_shadow.get(rec_id) if rec_id else None
|
||||
if shadow_record is None:
|
||||
shadow_record = _build_raw_shadow_record(rec)
|
||||
dst.write(json.dumps(shadow_record, ensure_ascii=False) + "\n")
|
||||
written += 1
|
||||
|
||||
if written == 0 and out_path.exists():
|
||||
out_path.unlink()
|
||||
return written
|
||||
|
||||
|
||||
def _run_selected_enrichment(slug: str, on_progress=None, **opts) -> Dict[str, Any]:
|
||||
data = read_library(slug)
|
||||
files = list(data.get("files", []))
|
||||
selected_paths = _selected_enrichment_paths(files)
|
||||
paths = _collect_library_paths(slug)
|
||||
|
||||
if not selected_paths:
|
||||
_cleanup_enrichment_artifacts(slug)
|
||||
if on_progress:
|
||||
on_progress("enrich", 1.0, "Skipping enrichment. All files are raw-indexed.")
|
||||
return {
|
||||
"status": "skipped",
|
||||
"selected_files": 0,
|
||||
"selected_records": 0,
|
||||
}
|
||||
|
||||
selected_records = _write_selected_corpus(paths["corpus"], paths["enrich_input"], selected_paths)
|
||||
if selected_records == 0:
|
||||
_cleanup_enrichment_artifacts(slug)
|
||||
if on_progress:
|
||||
on_progress("enrich", 1.0, "No selected records were eligible for enrichment.")
|
||||
return {
|
||||
"status": "skipped",
|
||||
"selected_files": len(selected_paths),
|
||||
"selected_records": 0,
|
||||
}
|
||||
|
||||
enrich_runner = _load_pipeline_fn("corpus_enricher", "run_enrich")
|
||||
result = enrich_runner(
|
||||
inp=paths["enrich_input"],
|
||||
out=paths["enhanced"],
|
||||
shadow_out=paths["shadow_partial"],
|
||||
on_progress=on_progress,
|
||||
ollama=opts.get("ollama", "http://localhost:11434"),
|
||||
model=opts.get("enrich_model", DEFAULT_ENRICH_MODEL),
|
||||
summary_lang=opts.get("summary_lang", "auto"),
|
||||
concurrency=opts.get("enrich_concurrency", DEFAULT_ENRICH_CONCURRENCY),
|
||||
min_chars=opts.get("min_chars", DEFAULT_ENRICH_MIN_CHARS),
|
||||
max_text=opts.get("max_text", DEFAULT_ENRICH_MAX_TEXT),
|
||||
cache_dir=opts.get("cache_dir", str(paths["base"] / ".rag_cache")),
|
||||
)
|
||||
result["selected_files"] = len(selected_paths)
|
||||
result["selected_records"] = selected_records
|
||||
result["shadow_records"] = _build_merged_shadow_corpus(paths["corpus"], paths["shadow_partial"], paths["shadow"])
|
||||
return result
|
||||
|
||||
|
||||
def _run_prepare_pipeline(slug: str, on_progress=None, **opts):
|
||||
data = read_library(slug)
|
||||
payload = library_payload(data)
|
||||
|
||||
Reference in New Issue
Block a user