forked from open-webui/open-webui
feat: multi-user support w/ RBAC
This commit is contained in:
parent
31e38df0a5
commit
921eef03b3
21 changed files with 1815 additions and 66 deletions
102
backend/apps/web/models/auths.py
Normal file
102
backend/apps/web/models/auths.py
Normal file
|
@ -0,0 +1,102 @@
|
|||
from pydantic import BaseModel
|
||||
from typing import List, Union, Optional
|
||||
import time
|
||||
import uuid
|
||||
|
||||
|
||||
from apps.web.models.users import UserModel, Users
|
||||
from utils import (
|
||||
verify_password,
|
||||
get_password_hash,
|
||||
bearer_scheme,
|
||||
create_token,
|
||||
)
|
||||
|
||||
import config
|
||||
|
||||
DB = config.DB
|
||||
|
||||
####################
|
||||
# DB MODEL
|
||||
####################
|
||||
|
||||
|
||||
class AuthModel(BaseModel):
|
||||
id: str
|
||||
email: str
|
||||
password: str
|
||||
active: bool = True
|
||||
|
||||
|
||||
####################
|
||||
# Forms
|
||||
####################
|
||||
|
||||
|
||||
class Token(BaseModel):
|
||||
token: str
|
||||
token_type: str
|
||||
|
||||
|
||||
class UserResponse(BaseModel):
|
||||
id: str
|
||||
email: str
|
||||
name: str
|
||||
role: str
|
||||
|
||||
|
||||
class SigninResponse(Token, UserResponse):
|
||||
pass
|
||||
|
||||
|
||||
class SigninForm(BaseModel):
|
||||
email: str
|
||||
password: str
|
||||
|
||||
|
||||
class SignupForm(BaseModel):
|
||||
name: str
|
||||
email: str
|
||||
password: str
|
||||
|
||||
|
||||
class AuthsTable:
|
||||
def __init__(self, db):
|
||||
self.db = db
|
||||
self.table = db.auths
|
||||
|
||||
def insert_new_auth(
|
||||
self, email: str, password: str, name: str, role: str = "user"
|
||||
) -> Optional[UserModel]:
|
||||
print("insert_new_auth")
|
||||
|
||||
id = str(uuid.uuid4())
|
||||
|
||||
auth = AuthModel(
|
||||
**{"id": id, "email": email, "password": password, "active": True}
|
||||
)
|
||||
result = self.table.insert_one(auth.model_dump())
|
||||
user = Users.insert_new_user(id, name, email, role)
|
||||
|
||||
print(result, user)
|
||||
if result and user:
|
||||
return user
|
||||
else:
|
||||
return None
|
||||
|
||||
def authenticate_user(self, email: str, password: str) -> Optional[UserModel]:
|
||||
print("authenticate_user")
|
||||
|
||||
auth = self.table.find_one({"email": email, "active": True})
|
||||
|
||||
if auth:
|
||||
if verify_password(password, auth["password"]):
|
||||
user = self.db.users.find_one({"id": auth["id"]})
|
||||
return UserModel(**user)
|
||||
else:
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
Auths = AuthsTable(DB)
|
76
backend/apps/web/models/users.py
Normal file
76
backend/apps/web/models/users.py
Normal file
|
@ -0,0 +1,76 @@
|
|||
from pydantic import BaseModel
|
||||
from typing import List, Union, Optional
|
||||
from pymongo import ReturnDocument
|
||||
import time
|
||||
|
||||
from utils import decode_token
|
||||
from config import DB
|
||||
|
||||
####################
|
||||
# User DB Schema
|
||||
####################
|
||||
|
||||
|
||||
class UserModel(BaseModel):
|
||||
id: str
|
||||
name: str
|
||||
email: str
|
||||
role: str = "user"
|
||||
created_at: int # timestamp in epoch
|
||||
|
||||
|
||||
####################
|
||||
# Forms
|
||||
####################
|
||||
|
||||
|
||||
class UsersTable:
|
||||
def __init__(self, db):
|
||||
self.db = db
|
||||
self.table = db.users
|
||||
|
||||
def insert_new_user(
|
||||
self, id: str, name: str, email: str, role: str = "user"
|
||||
) -> Optional[UserModel]:
|
||||
user = UserModel(
|
||||
**{
|
||||
"id": id,
|
||||
"name": name,
|
||||
"email": email,
|
||||
"role": role,
|
||||
"created_at": int(time.time()),
|
||||
}
|
||||
)
|
||||
result = self.table.insert_one(user.model_dump())
|
||||
|
||||
if result:
|
||||
return user
|
||||
else:
|
||||
return None
|
||||
|
||||
def get_user_by_email(self, email: str) -> Optional[UserModel]:
|
||||
user = self.table.find_one({"email": email}, {"_id": False})
|
||||
|
||||
if user:
|
||||
return UserModel(**user)
|
||||
else:
|
||||
return None
|
||||
|
||||
def get_user_by_token(self, token: str) -> Optional[UserModel]:
|
||||
data = decode_token(token)
|
||||
|
||||
if data != None and "email" in data:
|
||||
return self.get_user_by_email(data["email"])
|
||||
else:
|
||||
return None
|
||||
|
||||
def get_users(self, skip: int = 0, limit: int = 50) -> Optional[UserModel]:
|
||||
return [
|
||||
UserModel(**user)
|
||||
for user in list(self.table.find({}, {"_id": False}))
|
||||
.skip(skip)
|
||||
.limit(limit)
|
||||
]
|
||||
|
||||
|
||||
Users = UsersTable(DB)
|
Loading…
Add table
Add a link
Reference in a new issue