open-webui/backend/utils/utils.py
Anuraag Jain a01b112f7f feat(auth): add auth middleware
- refactored chat routes to use request.user instead of doing authentication in every route
2023-12-28 22:15:54 +02:00

68 lines
1.7 KiB
Python

from fastapi.security import HTTPBasicCredentials, HTTPBearer
from pydantic import BaseModel
from typing import Union, Optional
from passlib.context import CryptContext
from datetime import datetime, timedelta
import requests
import jwt
import config
JWT_SECRET_KEY = config.WEBUI_JWT_SECRET_KEY
ALGORITHM = "HS256"
##############
# Auth Utils
##############
bearer_scheme = HTTPBearer()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return (
pwd_context.verify(plain_password, hashed_password) if hashed_password else None
)
def get_password_hash(password):
return pwd_context.hash(password)
def create_token(data: dict, expires_delta: Union[timedelta, None] = None) -> str:
payload = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
payload.update({"exp": expire})
encoded_jwt = jwt.encode(payload, JWT_SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> Optional[dict]:
try:
decoded = jwt.decode(token, JWT_SECRET_KEY, options={"verify_signature": False})
return decoded
except Exception as e:
return None
def extract_token_from_auth_header(auth_header: str):
return auth_header[len("Bearer ") :]
def verify_token(request):
try:
authorization = request.headers["authorization"]
if authorization:
_, token = authorization.split()
decoded_token = jwt.decode(
token, JWT_SECRET_KEY, options={"verify_signature": False}
)
return decoded_token
else:
return None
except Exception as e:
return None