open-webui/backend/apps/web/routers/users.py

141 lines
3.9 KiB
Python
Raw Permalink Normal View History

2024-02-14 10:17:43 +01:00
from fastapi import Response, Request
2023-11-19 09:13:59 +01:00
from fastapi import Depends, FastAPI, HTTPException, status
from datetime import datetime, timedelta
from typing import List, Union, Optional
from fastapi import APIRouter
from pydantic import BaseModel
import time
import uuid
import logging
2023-11-19 09:13:59 +01:00
2024-01-06 05:59:56 +01:00
from apps.web.models.users import UserModel, UserUpdateForm, UserRoleUpdateForm, Users
2023-12-29 08:24:51 +01:00
from apps.web.models.auths import Auths
from utils.utils import get_current_user, get_password_hash, get_admin_user
2023-11-19 09:13:59 +01:00
from constants import ERROR_MESSAGES
from config import SRC_LOG_LEVELS
2024-03-31 10:13:39 +02:00
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MODELS"])
2023-11-19 09:13:59 +01:00
router = APIRouter()
############################
# GetUsers
############################
@router.get("/", response_model=List[UserModel])
async def get_users(skip: int = 0, limit: int = 50, user=Depends(get_admin_user)):
return Users.get_users(skip, limit)
2023-11-19 09:13:59 +01:00
2024-02-14 10:17:43 +01:00
############################
# User Permissions
############################
@router.get("/permissions/user")
async def get_user_permissions(request: Request, user=Depends(get_admin_user)):
return request.app.state.USER_PERMISSIONS
@router.post("/permissions/user")
async def update_user_permissions(
request: Request, form_data: dict, user=Depends(get_admin_user)
):
request.app.state.USER_PERMISSIONS = form_data
return request.app.state.USER_PERMISSIONS
2024-01-06 06:02:49 +01:00
############################
# UpdateUserRole
############################
@router.post("/update/role", response_model=Optional[UserModel])
2024-02-11 02:54:33 +01:00
async def update_user_role(form_data: UserRoleUpdateForm, user=Depends(get_admin_user)):
if user.id != form_data.id and form_data.id != Users.get_first_user().id:
2024-01-06 06:02:49 +01:00
return Users.update_user_role_by_id(form_data.id, form_data.role)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ERROR_MESSAGES.ACTION_PROHIBITED,
)
2024-01-06 06:02:49 +01:00
2023-11-19 09:13:59 +01:00
############################
2024-01-06 05:59:56 +01:00
# UpdateUserById
2023-11-19 09:13:59 +01:00
############################
2024-01-06 05:59:56 +01:00
@router.post("/{user_id}/update", response_model=Optional[UserModel])
async def update_user_by_id(
user_id: str, form_data: UserUpdateForm, session_user=Depends(get_admin_user)
2024-01-06 05:59:56 +01:00
):
user = Users.get_user_by_id(user_id)
if user:
2024-01-06 11:51:57 +01:00
if form_data.email.lower() != user.email:
email_user = Users.get_user_by_email(form_data.email.lower())
2024-01-06 05:59:56 +01:00
if email_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.EMAIL_TAKEN,
)
if form_data.password:
hashed = get_password_hash(form_data.password)
log.debug(f"hashed: {hashed}")
2024-01-06 05:59:56 +01:00
Auths.update_user_password_by_id(user_id, hashed)
2024-01-06 11:51:57 +01:00
Auths.update_email_by_id(user_id, form_data.email.lower())
2024-01-06 05:59:56 +01:00
updated_user = Users.update_user_by_id(
user_id,
{
"name": form_data.name,
2024-01-06 11:51:57 +01:00
"email": form_data.email.lower(),
2024-01-06 05:59:56 +01:00
"profile_image_url": form_data.profile_image_url,
},
)
if updated_user:
return updated_user
2023-11-19 09:13:59 +01:00
raise HTTPException(
2024-01-06 05:59:56 +01:00
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT(),
2023-11-19 09:13:59 +01:00
)
2023-12-29 08:02:49 +01:00
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.USER_NOT_FOUND,
)
2023-12-29 08:02:49 +01:00
############################
2023-12-29 08:24:51 +01:00
# DeleteUserById
2023-12-29 08:02:49 +01:00
############################
@router.delete("/{user_id}", response_model=bool)
async def delete_user_by_id(user_id: str, user=Depends(get_admin_user)):
if user.id != user_id:
result = Auths.delete_auth_by_id(user_id)
if result:
return True
2023-12-29 08:02:49 +01:00
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=ERROR_MESSAGES.DELETE_USER_ERROR,
2023-12-29 08:02:49 +01:00
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ERROR_MESSAGES.ACTION_PROHIBITED,
)