forked from open-webui/open-webui
172 lines
4.4 KiB
Python
172 lines
4.4 KiB
Python
from pathlib import Path
|
|
import ast
|
|
import builtins
|
|
|
|
|
|
from fastapi import (
|
|
FastAPI,
|
|
Request,
|
|
Depends,
|
|
HTTPException,
|
|
status,
|
|
UploadFile,
|
|
File,
|
|
Form,
|
|
)
|
|
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from apps.functions.security import ALLOWED_MODULES, ALLOWED_BUILTINS, custom_import
|
|
from utils.utils import get_current_user, get_admin_user
|
|
|
|
|
|
from config import FUNCTIONS_DIR
|
|
from constants import ERROR_MESSAGES
|
|
|
|
|
|
from pydantic import BaseModel
|
|
|
|
from typing import Optional
|
|
|
|
app = FastAPI()
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@app.get("/")
|
|
async def get_status():
|
|
return {"status": True}
|
|
|
|
|
|
class FunctionForm(BaseModel):
|
|
name: str
|
|
content: str
|
|
|
|
|
|
@app.post("/add")
|
|
def add_function(
|
|
form_data: FunctionForm,
|
|
user=Depends(get_admin_user),
|
|
):
|
|
try:
|
|
filename = f"{FUNCTIONS_DIR}/{form_data.name}.py"
|
|
if not Path(filename).exists():
|
|
with open(filename, "w") as file:
|
|
file.write(form_data.content)
|
|
return f"{form_data.name}.py" in list(
|
|
map(lambda x: x.name, Path(FUNCTIONS_DIR).rglob("./*"))
|
|
)
|
|
else:
|
|
raise Exception("Function already exists")
|
|
except Exception as e:
|
|
print(e)
|
|
return False
|
|
|
|
|
|
@app.post("/update")
|
|
def update_function(
|
|
form_data: FunctionForm,
|
|
user=Depends(get_admin_user),
|
|
):
|
|
try:
|
|
filename = f"{FUNCTIONS_DIR}/{form_data.name}.py"
|
|
if Path(filename).exists():
|
|
with open(filename, "w") as file:
|
|
file.write(form_data.content)
|
|
return f"{form_data.name}.py" in list(
|
|
map(lambda x: x.name, Path(FUNCTIONS_DIR).rglob("./*"))
|
|
)
|
|
else:
|
|
raise Exception("Function does not exist")
|
|
except Exception as e:
|
|
print(e)
|
|
return False
|
|
|
|
|
|
@app.get("/check/{function}")
|
|
def check_function(
|
|
function: str,
|
|
user=Depends(get_admin_user),
|
|
):
|
|
filename = f"{FUNCTIONS_DIR}/{function}.py"
|
|
|
|
# Check if the function file exists
|
|
if not Path(filename).is_file():
|
|
raise HTTPException(status_code=404, detail="Function not found")
|
|
|
|
# Read the code from the file
|
|
with open(filename, "r") as file:
|
|
code = file.read()
|
|
|
|
return {"name": function, "content": code}
|
|
|
|
|
|
@app.get("/list")
|
|
def list_functions(
|
|
user=Depends(get_admin_user),
|
|
):
|
|
files = list(map(lambda x: x.name, Path(FUNCTIONS_DIR).rglob("./*")))
|
|
return files
|
|
|
|
|
|
def validate_imports(code):
|
|
try:
|
|
tree = ast.parse(code)
|
|
except SyntaxError as e:
|
|
raise HTTPException(status_code=400, detail=f"Syntax error in function: {e}")
|
|
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.Import):
|
|
module_names = [alias.name for alias in node.names]
|
|
elif isinstance(node, ast.ImportFrom):
|
|
module_names = [node.module]
|
|
else:
|
|
continue
|
|
|
|
for name in module_names:
|
|
if name not in ALLOWED_MODULES:
|
|
raise HTTPException(
|
|
status_code=400, detail=f"Import of module {name} is not allowed"
|
|
)
|
|
|
|
|
|
@app.post("/exec/{function}")
|
|
def exec_function(
|
|
function: str,
|
|
kwargs: Optional[dict] = None,
|
|
user=Depends(get_current_user),
|
|
):
|
|
filename = f"{FUNCTIONS_DIR}/{function}.py"
|
|
|
|
# Check if the function file exists
|
|
if not Path(filename).is_file():
|
|
raise HTTPException(status_code=404, detail="Function not found")
|
|
|
|
# Read the code from the file
|
|
with open(filename, "r") as file:
|
|
code = file.read()
|
|
|
|
validate_imports(code)
|
|
|
|
try:
|
|
# Execute the code within a restricted namespace
|
|
namespace = {name: getattr(builtins, name) for name in ALLOWED_BUILTINS}
|
|
namespace["__import__"] = custom_import
|
|
exec(code, namespace)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=f"Function: {e}")
|
|
|
|
# Check if the function exists in the namespace
|
|
if "main" not in namespace or not callable(namespace["main"]):
|
|
raise HTTPException(status_code=400, detail="Invalid function")
|
|
|
|
try:
|
|
# Execute the function with provided kwargs
|
|
result = namespace["main"](kwargs) if kwargs else namespace["main"]()
|
|
return result
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=f"Function: {e}")
|