54 lines
2.2 KiB
Python
54 lines
2.2 KiB
Python
from datetime import timedelta
|
|
from typing import Annotated
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
|
from sqlmodel import Session, select
|
|
from database import get_session
|
|
from models import User, Profile
|
|
from schemas import UserCreate, Token, UserRead
|
|
from auth import verify_password, get_password_hash, create_access_token, ACCESS_TOKEN_EXPIRE_MINUTES, get_current_user
|
|
|
|
router = APIRouter(prefix="/auth", tags=["auth"])
|
|
|
|
@router.post("/register", response_model=UserRead)
|
|
def register(user_in: UserCreate, session: Session = Depends(get_session)):
|
|
user = session.exec(select(User).where(User.email == user_in.email)).first()
|
|
if user:
|
|
raise HTTPException(status_code=400, detail="Email already registered")
|
|
|
|
# Create User
|
|
hashed_password = get_password_hash(user_in.password)
|
|
db_user = User(email=user_in.email, hashed_password=hashed_password)
|
|
session.add(db_user)
|
|
session.commit()
|
|
session.refresh(db_user)
|
|
|
|
# Create Default Profile
|
|
profile = Profile(user_id=db_user.id, username=user_in.username, display_name=user_in.username)
|
|
session.add(profile)
|
|
session.commit()
|
|
|
|
return db_user
|
|
|
|
@router.post("/token", response_model=Token)
|
|
def login_for_access_token(
|
|
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
|
|
session: Session = Depends(get_session)
|
|
):
|
|
user = session.exec(select(User).where(User.email == form_data.username)).first()
|
|
if not user or not verify_password(form_data.password, user.hashed_password):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": user.email}, expires_delta=access_token_expires
|
|
)
|
|
return {"access_token": access_token, "token_type": "bearer"}
|
|
|
|
@router.get("/users/me", response_model=UserRead)
|
|
def read_users_me(current_user: Annotated[User, Depends(get_current_user)]):
|
|
return current_user
|