feat: edit user support

This commit is contained in:
Timothy J. Baek 2024-01-05 20:59:56 -08:00
parent 0262be4724
commit fb0c64379d
9 changed files with 371 additions and 43 deletions

View file

@ -75,26 +75,20 @@ class SignupForm(BaseModel):
class AuthsTable:
def __init__(self, db):
self.db = db
self.db.create_tables([Auth])
def insert_new_auth(self,
email: str,
password: str,
name: str,
role: str = "pending") -> Optional[UserModel]:
def insert_new_auth(
self, email: str, password: str, name: str, role: str = "pending"
) -> Optional[UserModel]:
print("insert_new_auth")
id = str(uuid.uuid4())
auth = AuthModel(**{
"id": id,
"email": email,
"password": password,
"active": True
})
auth = AuthModel(
**{"id": id, "email": email, "password": password, "active": True}
)
result = Auth.create(**auth.model_dump())
user = Users.insert_new_user(id, name, email, role)
@ -104,8 +98,7 @@ class AuthsTable:
else:
return None
def authenticate_user(self, email: str,
password: str) -> Optional[UserModel]:
def authenticate_user(self, email: str, password: str) -> Optional[UserModel]:
print("authenticate_user", email)
try:
auth = Auth.get(Auth.email == email, Auth.active == True)
@ -129,6 +122,15 @@ class AuthsTable:
except:
return False
def update_email_by_id(self, id: str, email: str) -> bool:
try:
query = Auth.update(email=email).where(Auth.id == id)
result = query.execute()
return True if result == 1 else False
except:
return False
def delete_auth_by_id(self, id: str) -> bool:
try:
# Delete User
@ -137,8 +139,7 @@ class AuthsTable:
if result:
# Delete Auth
query = Auth.delete().where(Auth.id == id)
query.execute(
) # Remove the rows, return number of rows removed.
query.execute() # Remove the rows, return number of rows removed.
return True
else:

View file

@ -44,17 +44,21 @@ class UserRoleUpdateForm(BaseModel):
role: str
class UsersTable:
class UserUpdateForm(BaseModel):
name: str
email: str
profile_image_url: str
password: Optional[str] = None
class UsersTable:
def __init__(self, db):
self.db = db
self.db.create_tables([User])
def insert_new_user(self,
id: str,
name: str,
email: str,
role: str = "pending") -> Optional[UserModel]:
def insert_new_user(
self, id: str, name: str, email: str, role: str = "pending"
) -> Optional[UserModel]:
user = UserModel(
**{
"id": id,
@ -63,7 +67,8 @@ class UsersTable:
"role": role,
"profile_image_url": get_gravatar_url(email),
"timestamp": int(time.time()),
})
}
)
result = User.create(**user.model_dump())
if result:
return user
@ -93,8 +98,7 @@ class UsersTable:
def get_num_users(self) -> Optional[int]:
return User.select().count()
def update_user_role_by_id(self, id: str,
role: str) -> Optional[UserModel]:
def update_user_role_by_id(self, id: str, role: str) -> Optional[UserModel]:
try:
query = User.update(role=role).where(User.id == id)
query.execute()
@ -104,6 +108,16 @@ class UsersTable:
except:
return None
def update_user_by_id(self, id: str, updated: dict) -> Optional[UserModel]:
try:
query = User.update(**updated).where(User.id == id)
query.execute()
user = User.get(User.id == id)
return UserModel(**model_to_dict(user))
except:
return None
def delete_user_by_id(self, id: str) -> bool:
try:
# Delete User Chats
@ -112,8 +126,7 @@ class UsersTable:
if result:
# Delete User
query = User.delete().where(User.id == id)
query.execute(
) # Remove the rows, return number of rows removed.
query.execute() # Remove the rows, return number of rows removed.
return True
else:

View file

@ -8,10 +8,10 @@ from pydantic import BaseModel
import time
import uuid
from apps.web.models.users import UserModel, UserRoleUpdateForm, Users
from apps.web.models.users import UserModel, UserUpdateForm, UserRoleUpdateForm, Users
from apps.web.models.auths import Auths
from utils.utils import get_current_user
from utils.utils import get_current_user, get_password_hash
from constants import ERROR_MESSAGES
router = APIRouter()
@ -22,9 +22,7 @@ router = APIRouter()
@router.get("/", response_model=List[UserModel])
async def get_users(skip: int = 0,
limit: int = 50,
user=Depends(get_current_user)):
async def get_users(skip: int = 0, limit: int = 50, user=Depends(get_current_user)):
if user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
@ -34,25 +32,58 @@ async def get_users(skip: int = 0,
############################
# UpdateUserRole
# UpdateUserById
############################
@router.post("/update/role", response_model=Optional[UserModel])
async def update_user_role(form_data: UserRoleUpdateForm,
user=Depends(get_current_user)):
if user.role != "admin":
@router.post("/{user_id}/update", response_model=Optional[UserModel])
async def update_user_by_id(
user_id: str, form_data: UserUpdateForm, session_user=Depends(get_current_user)
):
if session_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
if user.id != form_data.id:
return Users.update_user_role_by_id(form_data.id, form_data.role)
user = Users.get_user_by_id(user_id)
if user:
if form_data.email != user.email:
email_user = Users.get_user_by_email(form_data.email)
if email_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.EMAIL_TAKEN,
)
if form_data.password:
hashed = get_password_hash(form_data.password)
print(hashed)
Auths.update_user_password_by_id(user_id, hashed)
Auths.update_email_by_id(user_id, form_data.email)
updated_user = Users.update_user_by_id(
user_id,
{
"name": form_data.name,
"email": form_data.email,
"profile_image_url": form_data.profile_image_url,
},
)
if updated_user:
return updated_user
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT(),
)
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=ERROR_MESSAGES.ACTION_PROHIBITED,
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.USER_NOT_FOUND,
)