forked from open-webui/open-webui
		
	
		
			
				
	
	
		
			132 lines
		
	
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			132 lines
		
	
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from pydantic import BaseModel
 | 
						|
from peewee import *
 | 
						|
from playhouse.shortcuts import model_to_dict
 | 
						|
from typing import List, Union, Optional
 | 
						|
import time
 | 
						|
 | 
						|
from utils.utils import decode_token
 | 
						|
from utils.misc import get_gravatar_url
 | 
						|
 | 
						|
from apps.web.internal.db import DB
 | 
						|
from apps.web.models.chats import Chats
 | 
						|
 | 
						|
 | 
						|
####################
 | 
						|
# User DB Schema
 | 
						|
####################
 | 
						|
 | 
						|
 | 
						|
class User(Model):
 | 
						|
    id = CharField(unique=True)
 | 
						|
    name = CharField()
 | 
						|
    email = CharField()
 | 
						|
    role = CharField()
 | 
						|
    profile_image_url = CharField()
 | 
						|
    timestamp = DateField()
 | 
						|
 | 
						|
    class Meta:
 | 
						|
        database = DB
 | 
						|
 | 
						|
 | 
						|
class UserModel(BaseModel):
 | 
						|
    id: str
 | 
						|
    name: str
 | 
						|
    email: str
 | 
						|
    role: str = "pending"
 | 
						|
    profile_image_url: str = "/user.png"
 | 
						|
    timestamp: int  # timestamp in epoch
 | 
						|
 | 
						|
 | 
						|
####################
 | 
						|
# Forms
 | 
						|
####################
 | 
						|
 | 
						|
 | 
						|
class UserRoleUpdateForm(BaseModel):
 | 
						|
    id: str
 | 
						|
    role: str
 | 
						|
 | 
						|
 | 
						|
class UsersTable:
 | 
						|
    def __init__(self, db):
 | 
						|
        self.db = db
 | 
						|
        self.db.create_tables([User])
 | 
						|
 | 
						|
    def insert_new_user(
 | 
						|
        self, id: str, name: str, email: str, role: str = "pending"
 | 
						|
    ) -> Optional[UserModel]:
 | 
						|
        user = UserModel(
 | 
						|
            **{
 | 
						|
                "id": id,
 | 
						|
                "name": name,
 | 
						|
                "email": email,
 | 
						|
                "role": role,
 | 
						|
                "profile_image_url": get_gravatar_url(email),
 | 
						|
                "timestamp": int(time.time()),
 | 
						|
            }
 | 
						|
        )
 | 
						|
        result = User.create(**user.model_dump())
 | 
						|
        if result:
 | 
						|
            return user
 | 
						|
        else:
 | 
						|
            return None
 | 
						|
 | 
						|
    def get_user_by_id(self, id: str) -> Optional[UserModel]:
 | 
						|
        try:
 | 
						|
            user = User.get(User.id == id)
 | 
						|
            return UserModel(**model_to_dict(user))
 | 
						|
        except:
 | 
						|
            return None
 | 
						|
 | 
						|
    def get_user_by_email(self, email: str) -> Optional[UserModel]:
 | 
						|
        try:
 | 
						|
            user = User.get(User.email == email)
 | 
						|
            return UserModel(**model_to_dict(user))
 | 
						|
        except:
 | 
						|
            return None
 | 
						|
 | 
						|
    def get_user_by_token(self, token: str) -> Optional[UserModel]:
 | 
						|
        data = decode_token(token)
 | 
						|
 | 
						|
        if data != None and "email" in data:
 | 
						|
            return self.get_user_by_email(data["email"])
 | 
						|
        else:
 | 
						|
            return None
 | 
						|
 | 
						|
    def get_users(self, skip: int = 0, limit: int = 50) -> List[UserModel]:
 | 
						|
        return [
 | 
						|
            UserModel(**model_to_dict(user))
 | 
						|
            for user in User.select().limit(limit).offset(skip)
 | 
						|
        ]
 | 
						|
 | 
						|
    def get_num_users(self) -> Optional[int]:
 | 
						|
        return User.select().count()
 | 
						|
 | 
						|
    def update_user_role_by_id(self, id: str, role: str) -> Optional[UserModel]:
 | 
						|
        try:
 | 
						|
            query = User.update(role=role).where(User.id == id)
 | 
						|
            query.execute()
 | 
						|
 | 
						|
            user = User.get(User.id == id)
 | 
						|
            return UserModel(**model_to_dict(user))
 | 
						|
        except:
 | 
						|
            return None
 | 
						|
 | 
						|
    def delete_user_by_id(self, id: str) -> bool:
 | 
						|
        try:
 | 
						|
            # Delete User Chats
 | 
						|
            result = Chats.delete_chats_by_user_id(id)
 | 
						|
 | 
						|
            if result:
 | 
						|
                # Delete User
 | 
						|
                query = User.delete().where(User.id == id)
 | 
						|
                query.execute()  # Remove the rows, return number of rows removed.
 | 
						|
 | 
						|
                return True
 | 
						|
            else:
 | 
						|
                return False
 | 
						|
        except:
 | 
						|
            return False
 | 
						|
 | 
						|
 | 
						|
Users = UsersTable(DB)
 |