ai_se/ExecutionEngine.py

128 lines
4.2 KiB
Python

import subprocess
import streamlit as st
from pathlib import Path
import ast
from DebugLogger import log_error, format_output, format_error
def run_code(code: Path):
if "code_output" in st.session_state:
del st.session_state['code_output']
if "code_error" in st.session_state:
del st.session_state['code_error']
safety = check_code_safety(st.session_state["file_content"])
if safety is not None:
st.session_state['code_error'] = safety
return safety
try:
sub = subprocess.run(["python", code], capture_output=True, timeout=20, check=True)
st.session_state['code_output'] = format_output(sub.stdout)
except subprocess.CalledProcessError as exc:
st.session_state['code_error'] = format_error(exc)
log_error(st.session_state['code_error'])
#st.session_state['code_error'] = (f"{exc.returncode}{exc.stderr}")
#log_error(st.session_state['code_error'])
#if subprocess.CalledProcessError.stderr:
#print(vars(subprocess.CalledProcessError))
except subprocess.TimeoutExpired as exc:
st.session_state['code_error'] = format_error(exc)
log_error(st.session_state['code_error'])
#st.session_state['code_error'] = (f"{exc.stderr}")
#log_error(st.session_state['code_error'])
#print(sub.returncode)
#print(sub.check_returncode)
#print(sub.stderr)
#st.session_state['code_error'] = sub.stderr
#brauchts diese Funktion noch? macht eigentlich schon run_code
def capture_output():
errors = st.session_state['code_error']
#if errors == 0:
# errors = "There are no errors"
return st.session_state['code_output'], errors
#aus der uebungsloesung
def check_code_safety(code: str) -> str | None:
"""Statically analyze Python code for forbidden imports and builtins."""
try:
tree = ast.parse(code)
except SyntaxError as e:
return f"Syntax error: {e}"
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
top_level = alias.name.split(".")[0]
if top_level in BLOCKED_IMPORTS:
return (f"Blocked import: '{alias.name}' "
f"(module '{top_level}' is forbidden)")
elif isinstance(node, ast.ImportFrom):
if node.module:
top_level = node.module.split(".")[0]
if top_level in BLOCKED_IMPORTS:
return (f"Blocked import: 'from {node.module}' "
f"(module '{top_level}' is forbidden)")
elif isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
if node.func.id in BLOCKED_BUILTINS:
return f"Blocked builtin: '{node.func.id}()' is forbidden"
return None
BLOCKED_IMPORTS = {
# Filesystem access
"os", "pathlib", "shutil", "glob", "tempfile", "fileinput",
# Process execution
"subprocess", "multiprocessing", "threading",
# Network access
"socket", "http", "urllib", "requests", "ftplib", "smtplib",
"xmlrpc", "asyncio",
# System internals
"sys", "ctypes", "importlib", "code", "codeop", "compileall",
# Serialization exploits
"pickle", "shelve", "marshal",
# Other dangerous modules
"signal", "resource", "pty", "fcntl", "termios",
"webbrowser", "antigravity",
}
BLOCKED_BUILTINS = {
"exec", "eval", "compile", "__import__",
"open", "breakpoint", "exit", "quit",
"getattr", "setattr", "delattr",
"globals", "locals", "vars",
"memoryview", "type",
}
#qwen
#def run_code(code: Path):
# for key in ('code_output', 'code_error'):
# st.session_state.pop(key, None)
# try:
# res = subprocess.run(["python", code], capture_output=True, timeout=20, check=True)
# st.session_state['code_output'] = format_output(res.stdout)
# except subprocess.CalledProcessError as exc:
# st.session_state['code_error'] = format_error(exc)
# log_error(st.session_state['code_error'])
# except subprocess.TimeoutExpired as exc:
# st.session_state['code_error'] = format_error(exc)
# log_error(st.session_state['code_error'])