structure auth

This commit is contained in:
Robin COuret
2026-03-05 22:38:21 +01:00
parent 01f9e9f05e
commit a243149bf1
27 changed files with 68 additions and 52 deletions

Binary file not shown.

View File

@@ -0,0 +1,48 @@
from src.app.config import settings
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from argon2 import PasswordHasher
from argon2 import PasswordHasher
import jwt
from jwt.exceptions import InvalidTokenError
from src.app.models.user import User
from src.app.data.user import get_user
from .schemas import TokenData
from .security import verify_password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/token")
password_hasher = PasswordHasher()
def authenticate_user(username: str, password: str):
user: User = get_user(username)
if not user:
# Add timing to prevent attack
password_hasher.hash(password)
return False
if not verify_password(password, user.hashed_password):
return False
return user
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
username = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
user = get_user(username=token_data.username)
if user is None:
raise credentials_exception
return user

View File

@@ -0,0 +1,8 @@
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None

View File

@@ -0,0 +1,33 @@
from src.app.config import settings
from datetime import timedelta, datetime, timezone
from argon2 import PasswordHasher
from argon2.exceptions import (
VerifyMismatchError,
VerificationError,
InvalidHashError,
)
import jwt
from jwt.exceptions import InvalidTokenError
password_hasher = PasswordHasher()
def verify_password(plain_password: str, hashed_password: str) -> bool:
try:
return password_hasher.verify(hashed_password, plain_password)
except (VerifyMismatchError, VerificationError, InvalidHashError):
return False
def hash_password(password: str) -> str:
return password_hasher.hash(password)
def create_access_token(data: dict):
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = data.copy()
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
#def create_refresh_token(data: dict) -> str:
#def verify_token(token: str, token_type: str = "access") -> Optional[dict]: