210 lines
7.4 KiB
Python
210 lines
7.4 KiB
Python
import asyncio
|
|
import httpx
|
|
import json
|
|
import re
|
|
import time
|
|
from typing import Dict, Any, List, AsyncGenerator, Tuple
|
|
|
|
from .app_settings import get_ollama_api_url
|
|
|
|
_MODEL_DETAILS_CACHE: Dict[Tuple[str, str], Tuple[float, Dict[str, Any]]] = {}
|
|
_MODEL_DETAILS_TTL_S = 15.0
|
|
|
|
|
|
def _string_tokens(value: Any) -> List[str]:
|
|
if isinstance(value, str):
|
|
trimmed = value.strip()
|
|
return [trimmed] if trimmed else []
|
|
if isinstance(value, dict):
|
|
out: List[str] = []
|
|
for key, item in value.items():
|
|
out.extend(_string_tokens(key))
|
|
out.extend(_string_tokens(item))
|
|
return out
|
|
if isinstance(value, (list, tuple, set)):
|
|
out: List[str] = []
|
|
for item in value:
|
|
out.extend(_string_tokens(item))
|
|
return out
|
|
return []
|
|
|
|
|
|
def _normalize_capabilities(model_data: Dict[str, Any]) -> List[str]:
|
|
out = []
|
|
for item in model_data.get("capabilities") or []:
|
|
text = str(item).strip().lower()
|
|
if text and text not in out:
|
|
out.append(text)
|
|
return out
|
|
|
|
|
|
def _is_embedding_model(name: str, model_data: Dict[str, Any], tag_item: Dict[str, Any]) -> bool:
|
|
capabilities = set(_normalize_capabilities(model_data))
|
|
if "embedding" in capabilities or "embeddings" in capabilities:
|
|
return True
|
|
|
|
lowered_tokens = " ".join(
|
|
token.lower()
|
|
for token in _string_tokens(name) + _string_tokens(tag_item.get("details")) + _string_tokens(model_data)
|
|
)
|
|
return any(
|
|
marker in lowered_tokens
|
|
for marker in (
|
|
" embed ",
|
|
" embedding ",
|
|
"embed-",
|
|
"-embed",
|
|
"nomic-embed",
|
|
"mxbai-embed",
|
|
"snowflake-arctic-embed",
|
|
"bge-m3",
|
|
"bge ",
|
|
)
|
|
) or lowered_tokens.startswith("bge")
|
|
|
|
|
|
def _is_rerank_model(name: str, model_data: Dict[str, Any], tag_item: Dict[str, Any]) -> bool:
|
|
lowered_tokens = " ".join(
|
|
token.lower()
|
|
for token in _string_tokens(name) + _string_tokens(tag_item.get("details")) + _string_tokens(model_data)
|
|
)
|
|
return (
|
|
_is_embedding_model(name, model_data, tag_item)
|
|
or "rerank" in lowered_tokens
|
|
or "cross-encoder" in lowered_tokens
|
|
)
|
|
|
|
|
|
def _build_model_catalog_entry(tag_item: Dict[str, Any], model_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
name = str((tag_item or {}).get("name") or "").strip()
|
|
capabilities = _normalize_capabilities(model_data)
|
|
is_embedding = _is_embedding_model(name, model_data, tag_item)
|
|
is_rerank = _is_rerank_model(name, model_data, tag_item)
|
|
has_vision = supports_vision(model_data)
|
|
return {
|
|
"name": name,
|
|
"capabilities": capabilities,
|
|
"supports_vision": has_vision,
|
|
"is_embedding": is_embedding,
|
|
"can_chat": not is_embedding,
|
|
"can_rerank": is_rerank,
|
|
}
|
|
|
|
|
|
async def list_models() -> Dict[str, Any]:
|
|
ollama_url = get_ollama_api_url()
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
r = await client.get(f"{ollama_url}/api/tags")
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
# Normalize to a simple list of names
|
|
models = [m.get('name') for m in data.get('models', [])]
|
|
return {"models": models}
|
|
|
|
|
|
async def list_model_catalog(*, refresh: bool = False) -> Dict[str, Any]:
|
|
ollama_url = get_ollama_api_url()
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
r = await client.get(f"{ollama_url}/api/tags")
|
|
r.raise_for_status()
|
|
payload = r.json()
|
|
|
|
raw_models = payload.get("models", []) or []
|
|
names = [str((item or {}).get("name") or "").strip() for item in raw_models]
|
|
details = await asyncio.gather(
|
|
*(show_model(name, refresh=refresh) for name in names if name),
|
|
return_exceptions=True,
|
|
)
|
|
|
|
detail_by_name: Dict[str, Dict[str, Any]] = {}
|
|
for name, detail in zip([name for name in names if name], details):
|
|
if isinstance(detail, dict):
|
|
detail_by_name[name] = detail
|
|
|
|
models = [
|
|
_build_model_catalog_entry(item or {}, detail_by_name.get(str((item or {}).get("name") or "").strip(), {}))
|
|
for item in raw_models
|
|
if str((item or {}).get("name") or "").strip()
|
|
]
|
|
|
|
return {
|
|
"models": models,
|
|
"chat_models": [model["name"] for model in models if model["can_chat"]],
|
|
"embedding_models": [model["name"] for model in models if model["is_embedding"]],
|
|
"vision_models": [model["name"] for model in models if model["supports_vision"]],
|
|
"reranking_models": [model["name"] for model in models if model["can_rerank"]],
|
|
}
|
|
|
|
async def show_model(model: str, *, refresh: bool = False) -> Dict[str, Any]:
|
|
ollama_url = get_ollama_api_url()
|
|
cache_key = (ollama_url.rstrip('/'), str(model or '').strip())
|
|
cached = _MODEL_DETAILS_CACHE.get(cache_key)
|
|
now = time.monotonic()
|
|
if not refresh and cached and (now - cached[0]) < _MODEL_DETAILS_TTL_S:
|
|
return cached[1]
|
|
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
r = await client.post(f"{ollama_url}/api/show", json={"model": model})
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
_MODEL_DETAILS_CACHE[cache_key] = (now, data)
|
|
return data
|
|
|
|
def supports_vision(model_data: Dict[str, Any]) -> bool:
|
|
capabilities = model_data.get("capabilities") or []
|
|
if any(str(item).strip().lower() == "vision" for item in capabilities):
|
|
return True
|
|
|
|
model_info = model_data.get("model_info") or {}
|
|
if isinstance(model_info, dict):
|
|
for key in model_info.keys():
|
|
lowered = str(key).strip().lower()
|
|
if ".vision." in lowered or lowered.endswith(".vision"):
|
|
return True
|
|
if lowered.endswith("tokens_per_image") or re.search(r"\bmm\b", lowered):
|
|
return True
|
|
|
|
return False
|
|
|
|
async def chat(model: str, messages: List[Dict[str, Any]]) -> str:
|
|
ollama_url = get_ollama_api_url()
|
|
payload = {
|
|
"model": model,
|
|
"messages": messages,
|
|
"stream": False
|
|
}
|
|
async with httpx.AsyncClient(timeout=600.0) as client:
|
|
r = await client.post(f"{ollama_url}/api/chat", json=payload)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
# Ollama returns full conversation; pick last message content
|
|
try:
|
|
return data["message"]["content"]
|
|
except Exception:
|
|
# Newer Ollama formats may return messages list
|
|
msgs = data.get("messages") or []
|
|
if msgs:
|
|
return msgs[-1].get("content", "")
|
|
return data.get("content", "")
|
|
|
|
async def chat_stream(model: str, messages: List[Dict[str, Any]]) -> AsyncGenerator[str, None]:
|
|
ollama_url = get_ollama_api_url()
|
|
payload = {
|
|
"model": model,
|
|
"messages": messages,
|
|
"stream": True
|
|
}
|
|
async with httpx.AsyncClient(timeout=600.0) as client:
|
|
async with client.stream("POST", f"{ollama_url}/api/chat", json=payload) as r:
|
|
r.raise_for_status()
|
|
async for line in r.aiter_lines():
|
|
if line:
|
|
try:
|
|
chunk = json.loads(line)
|
|
if "content" in chunk: # Newer Ollama format
|
|
yield chunk["content"]
|
|
elif "message" in chunk and "content" in chunk["message"]: # Older format
|
|
yield chunk["message"]["content"]
|
|
except json.JSONDecodeError:
|
|
pass # Ignore invalid JSON lines
|