""" Streamlit Chat & File Editor for Qwen3.5 A minimal interface to: 1. Chat with the local LLM (OpenAI-compatible API) 2. Edit, save, and generate code / LaTeX files Usage: pip install streamlit openai streamlit run app.py """ import re import subprocess import streamlit as st from openai import OpenAI from pathlib import Path # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- st.sidebar.header("Connection") API_BASE = st.sidebar.text_input("API Base URL", "http://silicon.fhgr.ch:7080/v1") API_KEY = st.sidebar.text_input("API Key", "EMPTY", type="password") WORKSPACE = Path("workspace") WORKSPACE.mkdir(exist_ok=True) client = OpenAI(base_url=API_BASE, api_key=API_KEY) @st.cache_data(ttl=30) def fetch_models(base_url: str, api_key: str) -> list[str]: """Fetch available model IDs from the vLLM server.""" try: c = OpenAI(base_url=base_url, api_key=api_key) return [m.id for m in c.models.list().data] except Exception: return [] available_models = fetch_models(API_BASE, API_KEY) if available_models: MODEL = st.sidebar.selectbox("Model", available_models) else: MODEL = st.sidebar.text_input("Model (server unreachable)", "qwen3.5-35b-a3b") st.sidebar.warning("Could not fetch models from server.") # --------------------------------------------------------------------------- # Sidebar — LLM Parameters # --------------------------------------------------------------------------- st.sidebar.markdown("---") st.sidebar.header("LLM Parameters") thinking_mode = st.sidebar.toggle("Thinking Mode", value=False, help="Enable chain-of-thought reasoning. Better for complex tasks, slower for simple ones.") temperature = st.sidebar.slider("Temperature", 0.0, 2.0, 0.7, 0.05, help="Lower = deterministic, higher = creative.") max_tokens = st.sidebar.slider("Max Tokens", 256, 16384, 4096, 256, help="Maximum length of the response.") top_p = st.sidebar.slider("Top P", 0.0, 1.0, 0.95, 0.05, help="Nucleus sampling: only consider tokens within this cumulative probability.") presence_penalty = st.sidebar.slider("Presence Penalty", 0.0, 2.0, 0.0, 0.1, help="Penalize repeated topics. Higher values encourage the model to talk about new topics.") LANG_MAP = { ".py": "python", ".tex": "latex", ".js": "javascript", ".html": "html", ".css": "css", ".sh": "bash", ".json": "json", ".yaml": "yaml", ".yml": "yaml", } MAX_CONTEXT = 32768 def extract_code(text: str, lang: str = "") -> str: """Extract the best code block from markdown text. Strategy: 1. Prefer blocks tagged with the target language (e.g. ```python) 2. Among candidates, pick the longest block (skip trivial one-liners) 3. Fall back to the longest block of any language 4. Fall back to the full text if no fenced block is found """ tagged_pattern = r"```(\w*)\n(.*?)```" matches = re.findall(tagged_pattern, text, re.DOTALL) if not matches: return text.strip() lang_lower = lang.lower() lang_matches = [code for tag, code in matches if tag.lower() == lang_lower] if lang_matches: return max(lang_matches, key=len).strip() all_blocks = [code for _, code in matches] return max(all_blocks, key=len).strip() def estimate_tokens(messages: list[dict]) -> int: """Rough token estimate: ~4 characters per token.""" return sum(len(m["content"]) for m in messages) // 4 def trim_history(messages: list[dict], reserved: int) -> list[dict]: """Drop oldest message pairs to fit within context budget. Always keeps the latest user message.""" budget = MAX_CONTEXT - reserved while len(messages) > 1 and estimate_tokens(messages) > budget: messages.pop(0) return messages RUNNABLE_EXTENSIONS = {".py", ".tex"} RUN_TIMEOUT = 30 def run_file(file_path: Path) -> dict: """Execute a .py or .tex file and return stdout, stderr, and return code.""" suffix = file_path.suffix cwd = file_path.parent.resolve() if suffix == ".py": cmd = ["python3", file_path.name] elif suffix == ".tex": cmd = [ "pdflatex", "-interaction=nonstopmode", f"-output-directory={cwd}", file_path.name, ] else: return {"stdout": "", "stderr": f"Unsupported file type: {suffix}", "rc": 1} try: proc = subprocess.run( cmd, cwd=cwd, capture_output=True, text=True, timeout=RUN_TIMEOUT, ) return {"stdout": proc.stdout, "stderr": proc.stderr, "rc": proc.returncode} except subprocess.TimeoutExpired: return {"stdout": "", "stderr": f"Timed out after {RUN_TIMEOUT}s", "rc": -1} except FileNotFoundError as e: return {"stdout": "", "stderr": str(e), "rc": -1} # --------------------------------------------------------------------------- # Sidebar — File Manager # --------------------------------------------------------------------------- st.sidebar.markdown("---") st.sidebar.header("File Manager") new_filename = st.sidebar.text_input("New file name", placeholder="main.tex") if st.sidebar.button("Create File") and new_filename: (WORKSPACE / new_filename).touch() st.sidebar.success(f"Created {new_filename}") st.rerun() files = sorted(WORKSPACE.iterdir()) if WORKSPACE.exists() else [] file_names = [f.name for f in files if f.is_file()] selected_file = st.sidebar.selectbox("Open file", file_names if file_names else ["(no files)"]) # --------------------------------------------------------------------------- # Main Layout — Two Tabs # --------------------------------------------------------------------------- tab_chat, tab_editor = st.tabs(["Chat", "File Editor"]) # --------------------------------------------------------------------------- # Tab 1: Chat # --------------------------------------------------------------------------- with tab_chat: st.header(f"Chat with {MODEL}") if "messages" not in st.session_state: st.session_state.messages = [] for msg in st.session_state.messages: with st.chat_message(msg["role"]): st.markdown(msg["content"]) if prompt := st.chat_input("Ask anything..."): st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) st.session_state.messages = trim_history( st.session_state.messages, reserved=max_tokens ) with st.chat_message("assistant"): placeholder = st.empty() full_response = "" stream = client.chat.completions.create( model=MODEL, messages=st.session_state.messages, max_tokens=max_tokens, temperature=temperature, top_p=top_p, presence_penalty=presence_penalty, stream=True, extra_body={"chat_template_kwargs": {"enable_thinking": thinking_mode}}, ) for chunk in stream: delta = chunk.choices[0].delta.content or "" full_response += delta placeholder.markdown(full_response + "▌") placeholder.markdown(full_response) st.session_state.messages.append({"role": "assistant", "content": full_response}) if st.session_state.messages: used = estimate_tokens(st.session_state.messages) pct = min(used / MAX_CONTEXT, 1.0) label = f"Context: ~{used:,} / {MAX_CONTEXT:,} tokens" if pct > 0.8: label += " ⚠️ nearing limit — older messages will be trimmed" st.progress(pct, text=label) col_clear, col_save = st.columns([1, 3]) with col_clear: if st.button("Clear Chat"): st.session_state.messages = [] st.rerun() with col_save: if selected_file and selected_file != "(no files)": if st.button(f"Save code → {selected_file}"): last = st.session_state.messages[-1]["content"] suffix = Path(selected_file).suffix lang = LANG_MAP.get(suffix, "") code = extract_code(last, lang) (WORKSPACE / selected_file).write_text(code) st.success(f"Extracted code saved to workspace/{selected_file}") # --------------------------------------------------------------------------- # Tab 2: File Editor # --------------------------------------------------------------------------- with tab_editor: st.header("File Editor") if selected_file and selected_file != "(no files)": file_path = WORKSPACE / selected_file content = file_path.read_text() if file_path.exists() else "" suffix = file_path.suffix lang = LANG_MAP.get(suffix, "text") runnable = suffix in RUNNABLE_EXTENSIONS if runnable: col_edit, col_term = st.columns([3, 2]) else: col_edit = st.container() with col_edit: st.code(content, language=lang if lang != "text" else None, line_numbers=True) edited = st.text_area( "Edit below:", value=content, height=400, key=f"editor_{selected_file}_{hash(content)}", ) col_save, col_gen = st.columns(2) with col_save: if st.button("Save File"): file_path.write_text(edited) st.success(f"Saved {selected_file}") st.rerun() with col_gen: gen_prompt = st.text_input( "Generation instruction", placeholder="e.g. Add error handling / Fix the LaTeX formatting", key="gen_prompt", ) if st.button("Generate with LLM") and gen_prompt: with st.spinner("Generating..."): response = client.chat.completions.create( model=MODEL, messages=[ {"role": "system", "content": ( f"You are a coding assistant. The user has a {lang} file. " "Return ONLY the raw file content inside a single code block. " "No explanations, no comments about changes." )}, {"role": "user", "content": ( f"Here is my {lang} file:\n\n```\n{edited}\n```\n\n" f"Instruction: {gen_prompt}" )}, ], max_tokens=max_tokens, temperature=temperature, top_p=top_p, extra_body={"chat_template_kwargs": {"enable_thinking": thinking_mode}}, ) result = response.choices[0].message.content code = extract_code(result, lang) file_path.write_text(code) st.success("File updated by LLM") st.rerun() if runnable: with col_term: run_label = "Compile LaTeX" if suffix == ".tex" else "Run Python" st.subheader("Terminal Output") if st.button(run_label, type="primary"): file_path.write_text(edited) with st.spinner(f"{'Compiling' if suffix == '.tex' else 'Running'}..."): result = run_file(file_path) st.session_state["last_run"] = result result = st.session_state.get("last_run") if result: if result["rc"] == 0: st.success(f"Exit code: {result['rc']}") else: st.error(f"Exit code: {result['rc']}") if result["stdout"]: st.text_area( "stdout", value=result["stdout"], height=300, disabled=True, key="run_stdout", ) if result["stderr"]: st.text_area( "stderr", value=result["stderr"], height=200, disabled=True, key="run_stderr", ) if not result["stdout"] and not result["stderr"]: st.info("No output produced.") else: st.caption( f"Click **{run_label}** to execute the file " f"(timeout: {RUN_TIMEOUT}s)." ) else: st.info("Create a file in the sidebar to start editing.")