diff --git a/ExecutionEngine.py b/ExecutionEngine.py
index 90a4166..ff4842f 100644
--- a/ExecutionEngine.py
+++ b/ExecutionEngine.py
@@ -1,6 +1,7 @@
import subprocess
import streamlit as st
from pathlib import Path
+import ast
from DebugLogger import log_error, format_output, format_error
@@ -9,6 +10,13 @@ def run_code(code: Path):
del st.session_state['code_output']
if "code_error" in st.session_state:
del st.session_state['code_error']
+
+ safety = check_code_safety(st.session_state["file_content"])
+
+ if safety is not None:
+ st.session_state['code_error'] = safety
+ return safety
+
try:
sub = subprocess.run(["python", code], capture_output=True, timeout=20, check=True)
st.session_state['code_output'] = format_output(sub.stdout)
@@ -38,6 +46,64 @@ def capture_output():
# errors = "There are no errors"
return st.session_state['code_output'], errors
+
+
+#aus der uebungsloesung
+
+def check_code_safety(code: str) -> str | None:
+ """Statically analyze Python code for forbidden imports and builtins."""
+ try:
+ tree = ast.parse(code)
+ except SyntaxError as e:
+ return f"Syntax error: {e}"
+
+ for node in ast.walk(tree):
+ if isinstance(node, ast.Import):
+ for alias in node.names:
+ top_level = alias.name.split(".")[0]
+ if top_level in BLOCKED_IMPORTS:
+ return (f"Blocked import: '{alias.name}' "
+ f"(module '{top_level}' is forbidden)")
+
+ elif isinstance(node, ast.ImportFrom):
+ if node.module:
+ top_level = node.module.split(".")[0]
+ if top_level in BLOCKED_IMPORTS:
+ return (f"Blocked import: 'from {node.module}' "
+ f"(module '{top_level}' is forbidden)")
+
+ elif isinstance(node, ast.Call):
+ if isinstance(node.func, ast.Name):
+ if node.func.id in BLOCKED_BUILTINS:
+ return f"Blocked builtin: '{node.func.id}()' is forbidden"
+
+ return None
+
+BLOCKED_IMPORTS = {
+ # Filesystem access
+ "os", "pathlib", "shutil", "glob", "tempfile", "fileinput",
+ # Process execution
+ "subprocess", "multiprocessing", "threading",
+ # Network access
+ "socket", "http", "urllib", "requests", "ftplib", "smtplib",
+ "xmlrpc", "asyncio",
+ # System internals
+ "sys", "ctypes", "importlib", "code", "codeop", "compileall",
+ # Serialization exploits
+ "pickle", "shelve", "marshal",
+ # Other dangerous modules
+ "signal", "resource", "pty", "fcntl", "termios",
+ "webbrowser", "antigravity",
+}
+
+BLOCKED_BUILTINS = {
+ "exec", "eval", "compile", "__import__",
+ "open", "breakpoint", "exit", "quit",
+ "getattr", "setattr", "delattr",
+ "globals", "locals", "vars",
+ "memoryview", "type",
+}
+
#qwen
@@ -46,15 +112,15 @@ def capture_output():
# for key in ('code_output', 'code_error'):
# st.session_state.pop(key, None)
- try:
- res = subprocess.run(["python", code], capture_output=True, timeout=20, check=True)
- st.session_state['code_output'] = format_output(res.stdout)
- except subprocess.CalledProcessError as exc:
- st.session_state['code_error'] = format_error(exc)
- log_error(st.session_state['code_error'])
- except subprocess.TimeoutExpired as exc:
- st.session_state['code_error'] = format_error(exc)
- log_error(st.session_state['code_error'])
+ # try:
+ # res = subprocess.run(["python", code], capture_output=True, timeout=20, check=True)
+ # st.session_state['code_output'] = format_output(res.stdout)
+ # except subprocess.CalledProcessError as exc:
+ # st.session_state['code_error'] = format_error(exc)
+ # log_error(st.session_state['code_error'])
+ # except subprocess.TimeoutExpired as exc:
+ # st.session_state['code_error'] = format_error(exc)
+ # log_error(st.session_state['code_error'])
diff --git a/SearchManager.py b/SearchManager.py
index ed63535..da42062 100644
--- a/SearchManager.py
+++ b/SearchManager.py
@@ -1,8 +1,33 @@
+from ddgs import DDGS
+#query = "python programming"
+def perform_search(query: str) -> dict:
+ results = DDGS().text(query, max_results=3)
+ #print(results)
+ return results
-def perform_search(query):
- pass
+#def parse_results(raw_results: dict) -> list:
+# formatted_results = []
+# for result in raw_results:
+# for key in result:
+# if key == "href":
+# formatted_results.append(result.get(key))
+# if key == "body":
+# text = result.get(key)
+# if len(text) > 500:
+# text = text[0:500]
+# formatted_results.append(text)
+
+ return formatted_results
-def parse_results(raw_results):
- pass
\ No newline at end of file
+def parse_results(results: list[dict]) -> list[dict]:
+ return [
+ {
+ "url": r.get("href", ""),
+ "snippet": r.get("body", "")[:500]
+ }
+ for r in results
+ ]
+
+#parse_results(perform_search(query))
diff --git a/SystemPrompter.py b/SystemPrompter.py
index 572d30a..8ce42bc 100644
--- a/SystemPrompter.py
+++ b/SystemPrompter.py
@@ -1,4 +1,5 @@
import streamlit as st
+from SearchManager import perform_search, parse_results
def generate_prompt(user_message: str, file_context=None) -> str:
@@ -18,12 +19,17 @@ def generate_prompt(user_message: str, file_context=None) -> str:
task = (f" {user_message} ")
+ context = ""
+
+ if "internet" in st.session_state:
+ context += (f" {parse_results(perform_search(user_message))} ")
+
if "file_content" in st.session_state:
- context = (f" {st.session_state['file_content']} ")
+ context += (f" {st.session_state['file_content']} ")
else:
context = ""
if "code_error" in st.session_state:
- context += (f" {st.session_state['code_error']} ")
+ context += (f" {st.session_state['code_error']} ")
prompt = system_content + task + context
diff --git a/app.py b/app.py
index eca4e74..9324568 100644
--- a/app.py
+++ b/app.py
@@ -66,7 +66,7 @@ with col_chat:
# Input & Response
message = st.chat_input("How can qwen help you?", key="chat")
- internet = st.toggle("enable internet search")
+ internet = st.toggle("enable internet search", key="internet")
if message:
st.session_state.messages.append({"role": "user", "content": message})
response = send_message(message)