""" Light-weight *runs* endpoint to query artefacts produced by workers. • `GET /runs/{run_id}` → metadata & locations for a finished / running flow """ from __future__ import annotations from pathlib import Path from typing import Any from fastapi import APIRouter, HTTPException from librarian_core.storage.worker_store import WorkerStore from pydantic import BaseModel router = APIRouter(tags=["runs"]) # --------------------------------------------------------------------------- # # response model # # --------------------------------------------------------------------------- # class RunInfo(BaseModel): run_id: str worker: str state: str dir: Path data: dict | None = None # --------------------------------------------------------------------------- # # helper # # --------------------------------------------------------------------------- # def _open_store(run_id: str) -> WorkerStore: try: return WorkerStore.open(run_id) except FileNotFoundError as exc: raise HTTPException(status_code=404, detail="Run-id not found") from exc # --------------------------------------------------------------------------- # # routes # # --------------------------------------------------------------------------- # @router.get("/{run_id}", response_model=RunInfo) def get_run(run_id: str) -> RunInfo: """ Return coarse-grained information about a single flow run. For the web-UI we expose only minimal metadata plus the local directory where files were written; clients can read further details from disk. """ store = _open_store(run_id) meta = store.metadata # {'worker_name': …, 'state': …, …} return RunInfo( run_id=run_id, worker=meta["worker_name"], state=meta["state"], dir=store.data_dir, data=store.load_model(as_dict=True), # type: ignore ) @router.get("/{worker_name}/latest", response_model=RunInfo | None) def get_latest_run(worker_name: str) -> RunInfo | None: artifact: dict[str, Any] | None = WorkerStore.load_latest(worker_name=worker_name) if artifact is None: raise HTTPException(status_code=404, detail="No runs found") store = _open_store(artifact["run_id"]) meta = store.metadata return RunInfo( run_id=artifact["run_id"], worker=worker_name, state=meta["state"], dir=artifact["dir"], data=artifact["data"], ) @router.get("/{run_id}/artifact") def get_artifact(run_id: str) -> str: store = _open_store(run_id) # Check if the artifact.md file exists if not store._run_dir.joinpath("artifact.md").exists(): raise HTTPException(status_code=404, detail="Artifact not found") return store._run_dir.joinpath("artifact.md").read_text()