"""Authentication API endpoints. Job ID: MTAD-IMPL-2025-11-18-CL""" from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from passlib.context import CryptContext from datetime import datetime, timedelta import uuid from app.database import get_db from app.models import User, Profile, RefreshToken, AuthAuditLog from app.schemas.auth import ( UserRegisterRequest, UserRegisterResponse, UserLoginRequest, TokenResponse, RefreshTokenRequest, UserResponse ) from app.config import settings from jose import JWTError, jwt router = APIRouter() pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") def hash_password(password: str) -> str: """Hash password using Argon2.""" return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify password.""" return pwd_context.verify(plain_password, hashed_password) def create_access_token(user_id: str) -> str: """Create JWT access token.""" to_encode = { "sub": user_id, "exp": datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes), "iat": datetime.utcnow(), "type": "access", } encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm) return encoded_jwt def create_refresh_token(user_id: str, db: Session) -> tuple[str, str]: """Create refresh token.""" token_id = str(uuid.uuid4()) token_hash = hash_password(token_id) expires_at = datetime.utcnow() + timedelta(days=settings.refresh_token_expire_days) refresh_token = RefreshToken( id=str(uuid.uuid4()), user_id=user_id, token_hash=token_hash, expires_at=expires_at, ) db.add(refresh_token) db.commit() return token_id, token_hash @router.post("/signup", response_model=TokenResponse, status_code=status.HTTP_201_CREATED) async def signup(request: UserRegisterRequest, db: Session = Depends(get_db)): """User registration.""" # Check if email exists if db.query(User).filter(User.email == request.email).first(): raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Email already registered" ) # Create user user_id = str(uuid.uuid4()) user = User( id=user_id, email=request.email, password_hash=hash_password(request.password), ) db.add(user) # Create profile profile = Profile( id=str(uuid.uuid4()), user_id=user_id, display_name=request.display_name, ) db.add(profile) # Log event audit_log = AuthAuditLog( id=str(uuid.uuid4()), user_id=user_id, event_type="signup", ) db.add(audit_log) db.commit() # Generate tokens access_token = create_access_token(user_id) refresh_token, _ = create_refresh_token(user_id, db) return TokenResponse( access_token=access_token, refresh_token=refresh_token, ) @router.post("/login", response_model=TokenResponse) async def login(request: UserLoginRequest, db: Session = Depends(get_db)): """User login.""" user = db.query(User).filter(User.email == request.email).first() if not user or not verify_password(request.password, user.password_hash): # Log failed attempt audit_log = AuthAuditLog( id=str(uuid.uuid4()), event_type="login_fail", ) db.add(audit_log) db.commit() raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password" ) # Check if account locked if user.locked_until and user.locked_until > datetime.utcnow(): raise HTTPException( status_code=status.HTTP_423_LOCKED, detail="Account temporarily locked" ) # Log successful login user.failed_login_attempts = 0 audit_log = AuthAuditLog( id=str(uuid.uuid4()), user_id=user.id, event_type="login_success", ) db.add(audit_log) db.commit() # Generate tokens access_token = create_access_token(user.id) refresh_token, _ = create_refresh_token(user.id, db) return TokenResponse( access_token=access_token, refresh_token=refresh_token, ) @router.post("/refresh", response_model=TokenResponse) async def refresh(request: RefreshTokenRequest, db: Session = Depends(get_db)): """Refresh access token.""" # Validate refresh token format refresh_tokens = db.query(RefreshToken).filter( RefreshToken.expires_at > datetime.utcnow(), RefreshToken.revoked_at.is_(None) ).all() token_valid = False user_id = None for rt in refresh_tokens: if verify_password(request.refresh_token, rt.token_hash): token_valid = True user_id = rt.user_id # Revoke old token rt.revoked_at = datetime.utcnow() break if not token_valid: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token" ) db.commit() # Generate new tokens access_token = create_access_token(user_id) new_refresh_token, _ = create_refresh_token(user_id, db) return TokenResponse( access_token=access_token, refresh_token=new_refresh_token, ) @router.post("/logout") async def logout(current_user_id: str, db: Session = Depends(get_db)): """Logout user.""" # Invalidate all refresh tokens db.query(RefreshToken).filter( RefreshToken.user_id == current_user_id, RefreshToken.revoked_at.is_(None) ).update({"revoked_at": datetime.utcnow()}) audit_log = AuthAuditLog( id=str(uuid.uuid4()), user_id=current_user_id, event_type="logout", ) db.add(audit_log) db.commit() return {"message": "Logged out successfully"} @router.get("/me", response_model=UserResponse) async def get_current_user(current_user_id: str, db: Session = Depends(get_db)): """Get current user.""" user = db.query(User).filter(User.id == current_user_id).first() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") return user