347 lines
13 KiB
Python
347 lines
13 KiB
Python
"""
|
|
Streamlit Chat & File Editor for Qwen3.5
|
|
|
|
A minimal interface to:
|
|
1. Chat with the local LLM (OpenAI-compatible API)
|
|
2. Edit, save, and generate code / LaTeX files
|
|
|
|
Usage:
|
|
pip install streamlit openai
|
|
streamlit run app.py
|
|
"""
|
|
|
|
import re
|
|
import subprocess
|
|
import streamlit as st
|
|
from openai import OpenAI
|
|
from pathlib import Path
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
st.sidebar.header("Connection")
|
|
API_BASE = st.sidebar.text_input("API Base URL", "http://silicon.fhgr.ch:7080/v1")
|
|
API_KEY = st.sidebar.text_input("API Key", "EMPTY", type="password")
|
|
WORKSPACE = Path("workspace")
|
|
WORKSPACE.mkdir(exist_ok=True)
|
|
|
|
client = OpenAI(base_url=API_BASE, api_key=API_KEY)
|
|
|
|
|
|
@st.cache_data(ttl=30)
|
|
def fetch_models(base_url: str, api_key: str) -> list[str]:
|
|
"""Fetch available model IDs from the vLLM server."""
|
|
try:
|
|
c = OpenAI(base_url=base_url, api_key=api_key)
|
|
return [m.id for m in c.models.list().data]
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
available_models = fetch_models(API_BASE, API_KEY)
|
|
if available_models:
|
|
MODEL = st.sidebar.selectbox("Model", available_models)
|
|
else:
|
|
MODEL = st.sidebar.text_input("Model (server unreachable)", "qwen3.5-35b-a3b")
|
|
st.sidebar.warning("Could not fetch models from server.")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sidebar — LLM Parameters
|
|
# ---------------------------------------------------------------------------
|
|
st.sidebar.markdown("---")
|
|
st.sidebar.header("LLM Parameters")
|
|
|
|
thinking_mode = st.sidebar.toggle("Thinking Mode", value=False,
|
|
help="Enable chain-of-thought reasoning. Better for complex tasks, slower for simple ones.")
|
|
temperature = st.sidebar.slider("Temperature", 0.0, 2.0, 0.7, 0.05,
|
|
help="Lower = deterministic, higher = creative.")
|
|
max_tokens = st.sidebar.slider("Max Tokens", 256, 16384, 4096, 256,
|
|
help="Maximum length of the response.")
|
|
top_p = st.sidebar.slider("Top P", 0.0, 1.0, 0.95, 0.05,
|
|
help="Nucleus sampling: only consider tokens within this cumulative probability.")
|
|
presence_penalty = st.sidebar.slider("Presence Penalty", 0.0, 2.0, 0.0, 0.1,
|
|
help="Penalize repeated topics. Higher values encourage the model to talk about new topics.")
|
|
|
|
LANG_MAP = {
|
|
".py": "python", ".tex": "latex", ".js": "javascript",
|
|
".html": "html", ".css": "css", ".sh": "bash",
|
|
".json": "json", ".yaml": "yaml", ".yml": "yaml",
|
|
}
|
|
|
|
|
|
MAX_CONTEXT = 32768
|
|
|
|
|
|
def extract_code(text: str, lang: str = "") -> str:
|
|
"""Extract the best code block from markdown text.
|
|
|
|
Strategy:
|
|
1. Prefer blocks tagged with the target language (e.g. ```python)
|
|
2. Among candidates, pick the longest block (skip trivial one-liners)
|
|
3. Fall back to the longest block of any language
|
|
4. Fall back to the full text if no fenced block is found
|
|
"""
|
|
tagged_pattern = r"```(\w*)\n(.*?)```"
|
|
matches = re.findall(tagged_pattern, text, re.DOTALL)
|
|
if not matches:
|
|
return text.strip()
|
|
|
|
lang_lower = lang.lower()
|
|
lang_matches = [code for tag, code in matches if tag.lower() == lang_lower]
|
|
if lang_matches:
|
|
return max(lang_matches, key=len).strip()
|
|
|
|
all_blocks = [code for _, code in matches]
|
|
return max(all_blocks, key=len).strip()
|
|
|
|
|
|
def estimate_tokens(messages: list[dict]) -> int:
|
|
"""Rough token estimate: ~4 characters per token."""
|
|
return sum(len(m["content"]) for m in messages) // 4
|
|
|
|
|
|
def trim_history(messages: list[dict], reserved: int) -> list[dict]:
|
|
"""Drop oldest message pairs to fit within context budget.
|
|
Always keeps the latest user message."""
|
|
budget = MAX_CONTEXT - reserved
|
|
while len(messages) > 1 and estimate_tokens(messages) > budget:
|
|
messages.pop(0)
|
|
return messages
|
|
|
|
|
|
RUNNABLE_EXTENSIONS = {".py", ".tex"}
|
|
RUN_TIMEOUT = 30
|
|
|
|
|
|
def run_file(file_path: Path) -> dict:
|
|
"""Execute a .py or .tex file and return stdout, stderr, and return code."""
|
|
suffix = file_path.suffix
|
|
cwd = file_path.parent.resolve()
|
|
|
|
if suffix == ".py":
|
|
cmd = ["python3", file_path.name]
|
|
elif suffix == ".tex":
|
|
cmd = [
|
|
"pdflatex",
|
|
"-interaction=nonstopmode",
|
|
f"-output-directory={cwd}",
|
|
file_path.name,
|
|
]
|
|
else:
|
|
return {"stdout": "", "stderr": f"Unsupported file type: {suffix}", "rc": 1}
|
|
|
|
try:
|
|
proc = subprocess.run(
|
|
cmd,
|
|
cwd=cwd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=RUN_TIMEOUT,
|
|
)
|
|
return {"stdout": proc.stdout, "stderr": proc.stderr, "rc": proc.returncode}
|
|
except subprocess.TimeoutExpired:
|
|
return {"stdout": "", "stderr": f"Timed out after {RUN_TIMEOUT}s", "rc": -1}
|
|
except FileNotFoundError as e:
|
|
return {"stdout": "", "stderr": str(e), "rc": -1}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sidebar — File Manager
|
|
# ---------------------------------------------------------------------------
|
|
st.sidebar.markdown("---")
|
|
st.sidebar.header("File Manager")
|
|
|
|
new_filename = st.sidebar.text_input("New file name", placeholder="main.tex")
|
|
if st.sidebar.button("Create File") and new_filename:
|
|
(WORKSPACE / new_filename).touch()
|
|
st.sidebar.success(f"Created {new_filename}")
|
|
st.rerun()
|
|
|
|
files = sorted(WORKSPACE.iterdir()) if WORKSPACE.exists() else []
|
|
file_names = [f.name for f in files if f.is_file()]
|
|
selected_file = st.sidebar.selectbox("Open file", file_names if file_names else ["(no files)"])
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main Layout — Two Tabs
|
|
# ---------------------------------------------------------------------------
|
|
tab_chat, tab_editor = st.tabs(["Chat", "File Editor"])
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tab 1: Chat
|
|
# ---------------------------------------------------------------------------
|
|
with tab_chat:
|
|
st.header(f"Chat with {MODEL}")
|
|
|
|
if "messages" not in st.session_state:
|
|
st.session_state.messages = []
|
|
|
|
for msg in st.session_state.messages:
|
|
with st.chat_message(msg["role"]):
|
|
st.markdown(msg["content"])
|
|
|
|
if prompt := st.chat_input("Ask anything..."):
|
|
st.session_state.messages.append({"role": "user", "content": prompt})
|
|
with st.chat_message("user"):
|
|
st.markdown(prompt)
|
|
|
|
st.session_state.messages = trim_history(
|
|
st.session_state.messages, reserved=max_tokens
|
|
)
|
|
|
|
with st.chat_message("assistant"):
|
|
placeholder = st.empty()
|
|
full_response = ""
|
|
|
|
stream = client.chat.completions.create(
|
|
model=MODEL,
|
|
messages=st.session_state.messages,
|
|
max_tokens=max_tokens,
|
|
temperature=temperature,
|
|
top_p=top_p,
|
|
presence_penalty=presence_penalty,
|
|
stream=True,
|
|
extra_body={"chat_template_kwargs": {"enable_thinking": thinking_mode}},
|
|
)
|
|
for chunk in stream:
|
|
delta = chunk.choices[0].delta.content or ""
|
|
full_response += delta
|
|
placeholder.markdown(full_response + "▌")
|
|
placeholder.markdown(full_response)
|
|
|
|
st.session_state.messages.append({"role": "assistant", "content": full_response})
|
|
|
|
if st.session_state.messages:
|
|
used = estimate_tokens(st.session_state.messages)
|
|
pct = min(used / MAX_CONTEXT, 1.0)
|
|
label = f"Context: ~{used:,} / {MAX_CONTEXT:,} tokens"
|
|
if pct > 0.8:
|
|
label += " ⚠️ nearing limit — older messages will be trimmed"
|
|
st.progress(pct, text=label)
|
|
|
|
col_clear, col_save = st.columns([1, 3])
|
|
with col_clear:
|
|
if st.button("Clear Chat"):
|
|
st.session_state.messages = []
|
|
st.rerun()
|
|
with col_save:
|
|
if selected_file and selected_file != "(no files)":
|
|
if st.button(f"Save code → {selected_file}"):
|
|
last = st.session_state.messages[-1]["content"]
|
|
suffix = Path(selected_file).suffix
|
|
lang = LANG_MAP.get(suffix, "")
|
|
code = extract_code(last, lang)
|
|
(WORKSPACE / selected_file).write_text(code)
|
|
st.success(f"Extracted code saved to workspace/{selected_file}")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tab 2: File Editor
|
|
# ---------------------------------------------------------------------------
|
|
with tab_editor:
|
|
st.header("File Editor")
|
|
|
|
if selected_file and selected_file != "(no files)":
|
|
file_path = WORKSPACE / selected_file
|
|
content = file_path.read_text() if file_path.exists() else ""
|
|
suffix = file_path.suffix
|
|
lang = LANG_MAP.get(suffix, "text")
|
|
runnable = suffix in RUNNABLE_EXTENSIONS
|
|
|
|
if runnable:
|
|
col_edit, col_term = st.columns([3, 2])
|
|
else:
|
|
col_edit = st.container()
|
|
|
|
with col_edit:
|
|
st.code(content, language=lang if lang != "text" else None, line_numbers=True)
|
|
|
|
edited = st.text_area(
|
|
"Edit below:",
|
|
value=content,
|
|
height=400,
|
|
key=f"editor_{selected_file}_{hash(content)}",
|
|
)
|
|
|
|
col_save, col_gen = st.columns(2)
|
|
|
|
with col_save:
|
|
if st.button("Save File"):
|
|
file_path.write_text(edited)
|
|
st.success(f"Saved {selected_file}")
|
|
st.rerun()
|
|
|
|
with col_gen:
|
|
gen_prompt = st.text_input(
|
|
"Generation instruction",
|
|
placeholder="e.g. Add error handling / Fix the LaTeX formatting",
|
|
key="gen_prompt",
|
|
)
|
|
if st.button("Generate with LLM") and gen_prompt:
|
|
with st.spinner("Generating..."):
|
|
response = client.chat.completions.create(
|
|
model=MODEL,
|
|
messages=[
|
|
{"role": "system", "content": (
|
|
f"You are a coding assistant. The user has a {lang} file. "
|
|
"Return ONLY the raw file content inside a single code block. "
|
|
"No explanations, no comments about changes."
|
|
)},
|
|
{"role": "user", "content": (
|
|
f"Here is my {lang} file:\n\n```\n{edited}\n```\n\n"
|
|
f"Instruction: {gen_prompt}"
|
|
)},
|
|
],
|
|
max_tokens=max_tokens,
|
|
temperature=temperature,
|
|
top_p=top_p,
|
|
extra_body={"chat_template_kwargs": {"enable_thinking": thinking_mode}},
|
|
)
|
|
result = response.choices[0].message.content
|
|
code = extract_code(result, lang)
|
|
file_path.write_text(code)
|
|
st.success("File updated by LLM")
|
|
st.rerun()
|
|
|
|
if runnable:
|
|
with col_term:
|
|
run_label = "Compile LaTeX" if suffix == ".tex" else "Run Python"
|
|
st.subheader("Terminal Output")
|
|
|
|
if st.button(run_label, type="primary"):
|
|
file_path.write_text(edited)
|
|
with st.spinner(f"{'Compiling' if suffix == '.tex' else 'Running'}..."):
|
|
result = run_file(file_path)
|
|
st.session_state["last_run"] = result
|
|
|
|
result = st.session_state.get("last_run")
|
|
if result:
|
|
if result["rc"] == 0:
|
|
st.success(f"Exit code: {result['rc']}")
|
|
else:
|
|
st.error(f"Exit code: {result['rc']}")
|
|
|
|
if result["stdout"]:
|
|
st.text_area(
|
|
"stdout",
|
|
value=result["stdout"],
|
|
height=300,
|
|
disabled=True,
|
|
key="run_stdout",
|
|
)
|
|
if result["stderr"]:
|
|
st.text_area(
|
|
"stderr",
|
|
value=result["stderr"],
|
|
height=200,
|
|
disabled=True,
|
|
key="run_stderr",
|
|
)
|
|
if not result["stdout"] and not result["stderr"]:
|
|
st.info("No output produced.")
|
|
else:
|
|
st.caption(
|
|
f"Click **{run_label}** to execute the file "
|
|
f"(timeout: {RUN_TIMEOUT}s)."
|
|
)
|
|
else:
|
|
st.info("Create a file in the sidebar to start editing.")
|