from datetime import timedelta from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from sqlmodel import Session, select from database import get_session from models import User, Profile from schemas import UserCreate, Token, UserRead from auth import verify_password, get_password_hash, create_access_token, ACCESS_TOKEN_EXPIRE_MINUTES, get_current_user router = APIRouter(prefix="/auth", tags=["auth"]) @router.post("/register", response_model=UserRead) def register(user_in: UserCreate, session: Session = Depends(get_session)): user = session.exec(select(User).where(User.email == user_in.email)).first() if user: raise HTTPException(status_code=400, detail="Email already registered") # Create User hashed_password = get_password_hash(user_in.password) db_user = User(email=user_in.email, hashed_password=hashed_password) session.add(db_user) session.commit() session.refresh(db_user) # Create Default Profile profile = Profile(user_id=db_user.id, username=user_in.username, display_name=user_in.username) session.add(profile) session.commit() return db_user @router.post("/token", response_model=Token) def login_for_access_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], session: Session = Depends(get_session) ): user = session.exec(select(User).where(User.email == form_data.username)).first() if not user or not verify_password(form_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.email}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} @router.get("/users/me", response_model=UserRead) def read_users_me(current_user: Annotated[User, Depends(get_current_user)]): return current_user