herzogflorian a5657c3c1f Add dynamic model discovery and improve code extraction in app
Auto-detect available models from the vLLM API instead of hardcoding.
Extract code blocks by matching on language tag and picking the largest
block, avoiding false matches on short pip/run commands.

Made-with: Cursor
2026-03-02 20:03:45 +01:00

347 lines
13 KiB
Python

"""
Streamlit Chat & File Editor for Qwen3.5
A minimal interface to:
1. Chat with the local LLM (OpenAI-compatible API)
2. Edit, save, and generate code / LaTeX files
Usage:
pip install streamlit openai
streamlit run app.py
"""
import re
import subprocess
import streamlit as st
from openai import OpenAI
from pathlib import Path
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
st.sidebar.header("Connection")
API_BASE = st.sidebar.text_input("API Base URL", "http://silicon.fhgr.ch:7080/v1")
API_KEY = st.sidebar.text_input("API Key", "EMPTY", type="password")
WORKSPACE = Path("workspace")
WORKSPACE.mkdir(exist_ok=True)
client = OpenAI(base_url=API_BASE, api_key=API_KEY)
@st.cache_data(ttl=30)
def fetch_models(base_url: str, api_key: str) -> list[str]:
"""Fetch available model IDs from the vLLM server."""
try:
c = OpenAI(base_url=base_url, api_key=api_key)
return [m.id for m in c.models.list().data]
except Exception:
return []
available_models = fetch_models(API_BASE, API_KEY)
if available_models:
MODEL = st.sidebar.selectbox("Model", available_models)
else:
MODEL = st.sidebar.text_input("Model (server unreachable)", "qwen3.5-35b-a3b")
st.sidebar.warning("Could not fetch models from server.")
# ---------------------------------------------------------------------------
# Sidebar — LLM Parameters
# ---------------------------------------------------------------------------
st.sidebar.markdown("---")
st.sidebar.header("LLM Parameters")
thinking_mode = st.sidebar.toggle("Thinking Mode", value=False,
help="Enable chain-of-thought reasoning. Better for complex tasks, slower for simple ones.")
temperature = st.sidebar.slider("Temperature", 0.0, 2.0, 0.7, 0.05,
help="Lower = deterministic, higher = creative.")
max_tokens = st.sidebar.slider("Max Tokens", 256, 16384, 4096, 256,
help="Maximum length of the response.")
top_p = st.sidebar.slider("Top P", 0.0, 1.0, 0.95, 0.05,
help="Nucleus sampling: only consider tokens within this cumulative probability.")
presence_penalty = st.sidebar.slider("Presence Penalty", 0.0, 2.0, 0.0, 0.1,
help="Penalize repeated topics. Higher values encourage the model to talk about new topics.")
LANG_MAP = {
".py": "python", ".tex": "latex", ".js": "javascript",
".html": "html", ".css": "css", ".sh": "bash",
".json": "json", ".yaml": "yaml", ".yml": "yaml",
}
MAX_CONTEXT = 32768
def extract_code(text: str, lang: str = "") -> str:
"""Extract the best code block from markdown text.
Strategy:
1. Prefer blocks tagged with the target language (e.g. ```python)
2. Among candidates, pick the longest block (skip trivial one-liners)
3. Fall back to the longest block of any language
4. Fall back to the full text if no fenced block is found
"""
tagged_pattern = r"```(\w*)\n(.*?)```"
matches = re.findall(tagged_pattern, text, re.DOTALL)
if not matches:
return text.strip()
lang_lower = lang.lower()
lang_matches = [code for tag, code in matches if tag.lower() == lang_lower]
if lang_matches:
return max(lang_matches, key=len).strip()
all_blocks = [code for _, code in matches]
return max(all_blocks, key=len).strip()
def estimate_tokens(messages: list[dict]) -> int:
"""Rough token estimate: ~4 characters per token."""
return sum(len(m["content"]) for m in messages) // 4
def trim_history(messages: list[dict], reserved: int) -> list[dict]:
"""Drop oldest message pairs to fit within context budget.
Always keeps the latest user message."""
budget = MAX_CONTEXT - reserved
while len(messages) > 1 and estimate_tokens(messages) > budget:
messages.pop(0)
return messages
RUNNABLE_EXTENSIONS = {".py", ".tex"}
RUN_TIMEOUT = 30
def run_file(file_path: Path) -> dict:
"""Execute a .py or .tex file and return stdout, stderr, and return code."""
suffix = file_path.suffix
cwd = file_path.parent.resolve()
if suffix == ".py":
cmd = ["python3", file_path.name]
elif suffix == ".tex":
cmd = [
"pdflatex",
"-interaction=nonstopmode",
f"-output-directory={cwd}",
file_path.name,
]
else:
return {"stdout": "", "stderr": f"Unsupported file type: {suffix}", "rc": 1}
try:
proc = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
timeout=RUN_TIMEOUT,
)
return {"stdout": proc.stdout, "stderr": proc.stderr, "rc": proc.returncode}
except subprocess.TimeoutExpired:
return {"stdout": "", "stderr": f"Timed out after {RUN_TIMEOUT}s", "rc": -1}
except FileNotFoundError as e:
return {"stdout": "", "stderr": str(e), "rc": -1}
# ---------------------------------------------------------------------------
# Sidebar — File Manager
# ---------------------------------------------------------------------------
st.sidebar.markdown("---")
st.sidebar.header("File Manager")
new_filename = st.sidebar.text_input("New file name", placeholder="main.tex")
if st.sidebar.button("Create File") and new_filename:
(WORKSPACE / new_filename).touch()
st.sidebar.success(f"Created {new_filename}")
st.rerun()
files = sorted(WORKSPACE.iterdir()) if WORKSPACE.exists() else []
file_names = [f.name for f in files if f.is_file()]
selected_file = st.sidebar.selectbox("Open file", file_names if file_names else ["(no files)"])
# ---------------------------------------------------------------------------
# Main Layout — Two Tabs
# ---------------------------------------------------------------------------
tab_chat, tab_editor = st.tabs(["Chat", "File Editor"])
# ---------------------------------------------------------------------------
# Tab 1: Chat
# ---------------------------------------------------------------------------
with tab_chat:
st.header(f"Chat with {MODEL}")
if "messages" not in st.session_state:
st.session_state.messages = []
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"])
if prompt := st.chat_input("Ask anything..."):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
st.session_state.messages = trim_history(
st.session_state.messages, reserved=max_tokens
)
with st.chat_message("assistant"):
placeholder = st.empty()
full_response = ""
stream = client.chat.completions.create(
model=MODEL,
messages=st.session_state.messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
presence_penalty=presence_penalty,
stream=True,
extra_body={"chat_template_kwargs": {"enable_thinking": thinking_mode}},
)
for chunk in stream:
delta = chunk.choices[0].delta.content or ""
full_response += delta
placeholder.markdown(full_response + "")
placeholder.markdown(full_response)
st.session_state.messages.append({"role": "assistant", "content": full_response})
if st.session_state.messages:
used = estimate_tokens(st.session_state.messages)
pct = min(used / MAX_CONTEXT, 1.0)
label = f"Context: ~{used:,} / {MAX_CONTEXT:,} tokens"
if pct > 0.8:
label += " ⚠️ nearing limit — older messages will be trimmed"
st.progress(pct, text=label)
col_clear, col_save = st.columns([1, 3])
with col_clear:
if st.button("Clear Chat"):
st.session_state.messages = []
st.rerun()
with col_save:
if selected_file and selected_file != "(no files)":
if st.button(f"Save code → {selected_file}"):
last = st.session_state.messages[-1]["content"]
suffix = Path(selected_file).suffix
lang = LANG_MAP.get(suffix, "")
code = extract_code(last, lang)
(WORKSPACE / selected_file).write_text(code)
st.success(f"Extracted code saved to workspace/{selected_file}")
# ---------------------------------------------------------------------------
# Tab 2: File Editor
# ---------------------------------------------------------------------------
with tab_editor:
st.header("File Editor")
if selected_file and selected_file != "(no files)":
file_path = WORKSPACE / selected_file
content = file_path.read_text() if file_path.exists() else ""
suffix = file_path.suffix
lang = LANG_MAP.get(suffix, "text")
runnable = suffix in RUNNABLE_EXTENSIONS
if runnable:
col_edit, col_term = st.columns([3, 2])
else:
col_edit = st.container()
with col_edit:
st.code(content, language=lang if lang != "text" else None, line_numbers=True)
edited = st.text_area(
"Edit below:",
value=content,
height=400,
key=f"editor_{selected_file}_{hash(content)}",
)
col_save, col_gen = st.columns(2)
with col_save:
if st.button("Save File"):
file_path.write_text(edited)
st.success(f"Saved {selected_file}")
st.rerun()
with col_gen:
gen_prompt = st.text_input(
"Generation instruction",
placeholder="e.g. Add error handling / Fix the LaTeX formatting",
key="gen_prompt",
)
if st.button("Generate with LLM") and gen_prompt:
with st.spinner("Generating..."):
response = client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": (
f"You are a coding assistant. The user has a {lang} file. "
"Return ONLY the raw file content inside a single code block. "
"No explanations, no comments about changes."
)},
{"role": "user", "content": (
f"Here is my {lang} file:\n\n```\n{edited}\n```\n\n"
f"Instruction: {gen_prompt}"
)},
],
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
extra_body={"chat_template_kwargs": {"enable_thinking": thinking_mode}},
)
result = response.choices[0].message.content
code = extract_code(result, lang)
file_path.write_text(code)
st.success("File updated by LLM")
st.rerun()
if runnable:
with col_term:
run_label = "Compile LaTeX" if suffix == ".tex" else "Run Python"
st.subheader("Terminal Output")
if st.button(run_label, type="primary"):
file_path.write_text(edited)
with st.spinner(f"{'Compiling' if suffix == '.tex' else 'Running'}..."):
result = run_file(file_path)
st.session_state["last_run"] = result
result = st.session_state.get("last_run")
if result:
if result["rc"] == 0:
st.success(f"Exit code: {result['rc']}")
else:
st.error(f"Exit code: {result['rc']}")
if result["stdout"]:
st.text_area(
"stdout",
value=result["stdout"],
height=300,
disabled=True,
key="run_stdout",
)
if result["stderr"]:
st.text_area(
"stderr",
value=result["stderr"],
height=200,
disabled=True,
key="run_stderr",
)
if not result["stdout"] and not result["stderr"]:
st.info("No output produced.")
else:
st.caption(
f"Click **{run_label}** to execute the file "
f"(timeout: {RUN_TIMEOUT}s)."
)
else:
st.info("Create a file in the sidebar to start editing.")