From 0fdcd8d54793f4b7db9e1d6340cf9591b327c671 Mon Sep 17 00:00:00 2001 From: Franziska Gerold Date: Wed, 27 May 2026 11:48:57 +0200 Subject: [PATCH] milestone 3 - ExecutionsEngine: added safety, SearchManager: added manual internet search --- ExecutionEngine.py | 84 +++++++++++++++++++++++++++++++++++++++++----- SearchManager.py | 33 +++++++++++++++--- SystemPrompter.py | 10 ++++-- app.py | 2 +- 4 files changed, 113 insertions(+), 16 deletions(-) diff --git a/ExecutionEngine.py b/ExecutionEngine.py index 90a4166..ff4842f 100644 --- a/ExecutionEngine.py +++ b/ExecutionEngine.py @@ -1,6 +1,7 @@ import subprocess import streamlit as st from pathlib import Path +import ast from DebugLogger import log_error, format_output, format_error @@ -9,6 +10,13 @@ def run_code(code: Path): del st.session_state['code_output'] if "code_error" in st.session_state: del st.session_state['code_error'] + + safety = check_code_safety(st.session_state["file_content"]) + + if safety is not None: + st.session_state['code_error'] = safety + return safety + try: sub = subprocess.run(["python", code], capture_output=True, timeout=20, check=True) st.session_state['code_output'] = format_output(sub.stdout) @@ -38,6 +46,64 @@ def capture_output(): # errors = "There are no errors" return st.session_state['code_output'], errors + + +#aus der uebungsloesung + +def check_code_safety(code: str) -> str | None: + """Statically analyze Python code for forbidden imports and builtins.""" + try: + tree = ast.parse(code) + except SyntaxError as e: + return f"Syntax error: {e}" + + for node in ast.walk(tree): + if isinstance(node, ast.Import): + for alias in node.names: + top_level = alias.name.split(".")[0] + if top_level in BLOCKED_IMPORTS: + return (f"Blocked import: '{alias.name}' " + f"(module '{top_level}' is forbidden)") + + elif isinstance(node, ast.ImportFrom): + if node.module: + top_level = node.module.split(".")[0] + if top_level in BLOCKED_IMPORTS: + return (f"Blocked import: 'from {node.module}' " + f"(module '{top_level}' is forbidden)") + + elif isinstance(node, ast.Call): + if isinstance(node.func, ast.Name): + if node.func.id in BLOCKED_BUILTINS: + return f"Blocked builtin: '{node.func.id}()' is forbidden" + + return None + +BLOCKED_IMPORTS = { + # Filesystem access + "os", "pathlib", "shutil", "glob", "tempfile", "fileinput", + # Process execution + "subprocess", "multiprocessing", "threading", + # Network access + "socket", "http", "urllib", "requests", "ftplib", "smtplib", + "xmlrpc", "asyncio", + # System internals + "sys", "ctypes", "importlib", "code", "codeop", "compileall", + # Serialization exploits + "pickle", "shelve", "marshal", + # Other dangerous modules + "signal", "resource", "pty", "fcntl", "termios", + "webbrowser", "antigravity", +} + +BLOCKED_BUILTINS = { + "exec", "eval", "compile", "__import__", + "open", "breakpoint", "exit", "quit", + "getattr", "setattr", "delattr", + "globals", "locals", "vars", + "memoryview", "type", +} + #qwen @@ -46,15 +112,15 @@ def capture_output(): # for key in ('code_output', 'code_error'): # st.session_state.pop(key, None) - try: - res = subprocess.run(["python", code], capture_output=True, timeout=20, check=True) - st.session_state['code_output'] = format_output(res.stdout) - except subprocess.CalledProcessError as exc: - st.session_state['code_error'] = format_error(exc) - log_error(st.session_state['code_error']) - except subprocess.TimeoutExpired as exc: - st.session_state['code_error'] = format_error(exc) - log_error(st.session_state['code_error']) + # try: + # res = subprocess.run(["python", code], capture_output=True, timeout=20, check=True) + # st.session_state['code_output'] = format_output(res.stdout) + # except subprocess.CalledProcessError as exc: + # st.session_state['code_error'] = format_error(exc) + # log_error(st.session_state['code_error']) + # except subprocess.TimeoutExpired as exc: + # st.session_state['code_error'] = format_error(exc) + # log_error(st.session_state['code_error']) diff --git a/SearchManager.py b/SearchManager.py index ed63535..da42062 100644 --- a/SearchManager.py +++ b/SearchManager.py @@ -1,8 +1,33 @@ +from ddgs import DDGS +#query = "python programming" +def perform_search(query: str) -> dict: + results = DDGS().text(query, max_results=3) + #print(results) + return results -def perform_search(query): - pass +#def parse_results(raw_results: dict) -> list: +# formatted_results = [] +# for result in raw_results: +# for key in result: +# if key == "href": +# formatted_results.append(result.get(key)) +# if key == "body": +# text = result.get(key) +# if len(text) > 500: +# text = text[0:500] +# formatted_results.append(text) + + return formatted_results -def parse_results(raw_results): - pass \ No newline at end of file +def parse_results(results: list[dict]) -> list[dict]: + return [ + { + "url": r.get("href", ""), + "snippet": r.get("body", "")[:500] + } + for r in results + ] + +#parse_results(perform_search(query)) diff --git a/SystemPrompter.py b/SystemPrompter.py index 572d30a..8ce42bc 100644 --- a/SystemPrompter.py +++ b/SystemPrompter.py @@ -1,4 +1,5 @@ import streamlit as st +from SearchManager import perform_search, parse_results def generate_prompt(user_message: str, file_context=None) -> str: @@ -18,12 +19,17 @@ def generate_prompt(user_message: str, file_context=None) -> str: task = (f" {user_message} ") + context = "" + + if "internet" in st.session_state: + context += (f" {parse_results(perform_search(user_message))} ") + if "file_content" in st.session_state: - context = (f" {st.session_state['file_content']} ") + context += (f" {st.session_state['file_content']} ") else: context = "" if "code_error" in st.session_state: - context += (f" {st.session_state['code_error']} ") + context += (f" {st.session_state['code_error']} ") prompt = system_content + task + context diff --git a/app.py b/app.py index eca4e74..9324568 100644 --- a/app.py +++ b/app.py @@ -66,7 +66,7 @@ with col_chat: # Input & Response message = st.chat_input("How can qwen help you?", key="chat") - internet = st.toggle("enable internet search") + internet = st.toggle("enable internet search", key="internet") if message: st.session_state.messages.append({"role": "user", "content": message}) response = send_message(message)