""" User Playlist API - curated collections of performances. """ from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, Query from sqlmodel import Session, select from pydantic import BaseModel from datetime import datetime from database import get_session from models import UserPlaylist, PlaylistPerformance, Performance, Show, Song, User from auth import get_current_user from slugify import generate_slug router = APIRouter(prefix="/playlists", tags=["playlists"]) class PlaylistCreate(BaseModel): name: str description: Optional[str] = None is_public: bool = True class PlaylistUpdate(BaseModel): name: Optional[str] = None description: Optional[str] = None is_public: Optional[bool] = None class PerformanceInPlaylist(BaseModel): performance_id: int position: int notes: Optional[str] = None song_title: str show_date: str show_slug: Optional[str] class PlaylistRead(BaseModel): id: int name: str slug: str description: Optional[str] is_public: bool user_id: int username: Optional[str] created_at: str performance_count: int class PlaylistDetailRead(BaseModel): id: int name: str slug: str description: Optional[str] is_public: bool user_id: int username: Optional[str] created_at: str performances: List[PerformanceInPlaylist] @router.post("/", response_model=PlaylistRead) def create_playlist( playlist: PlaylistCreate, session: Session = Depends(get_session), current_user: User = Depends(get_current_user) ): """Create a new playlist""" slug = generate_slug(playlist.name) # Make slug unique per user existing = session.exec( select(UserPlaylist) .where(UserPlaylist.user_id == current_user.id) .where(UserPlaylist.slug == slug) ).first() if existing: slug = f"{slug}-{int(datetime.utcnow().timestamp())}" db_playlist = UserPlaylist( user_id=current_user.id, name=playlist.name, slug=slug, description=playlist.description, is_public=playlist.is_public ) session.add(db_playlist) session.commit() session.refresh(db_playlist) return PlaylistRead( id=db_playlist.id, name=db_playlist.name, slug=db_playlist.slug, description=db_playlist.description, is_public=db_playlist.is_public, user_id=db_playlist.user_id, username=current_user.username, created_at=db_playlist.created_at.isoformat(), performance_count=0 ) @router.get("/", response_model=List[PlaylistRead]) def list_playlists( user_id: Optional[int] = None, limit: int = Query(default=20, le=100), offset: int = 0, session: Session = Depends(get_session) ): """List public playlists, optionally filtered by user""" query = select(UserPlaylist).where(UserPlaylist.is_public == True) if user_id: query = query.where(UserPlaylist.user_id == user_id) query = query.order_by(UserPlaylist.created_at.desc()).offset(offset).limit(limit) playlists = session.exec(query).all() result = [] for p in playlists: user = session.get(User, p.user_id) perf_count = len(session.exec( select(PlaylistPerformance).where(PlaylistPerformance.playlist_id == p.id) ).all()) result.append(PlaylistRead( id=p.id, name=p.name, slug=p.slug, description=p.description, is_public=p.is_public, user_id=p.user_id, username=user.username if user else None, created_at=p.created_at.isoformat(), performance_count=perf_count )) return result @router.get("/mine", response_model=List[PlaylistRead]) def list_my_playlists( session: Session = Depends(get_session), current_user: User = Depends(get_current_user) ): """List current user's playlists (public and private)""" playlists = session.exec( select(UserPlaylist) .where(UserPlaylist.user_id == current_user.id) .order_by(UserPlaylist.created_at.desc()) ).all() result = [] for p in playlists: perf_count = len(session.exec( select(PlaylistPerformance).where(PlaylistPerformance.playlist_id == p.id) ).all()) result.append(PlaylistRead( id=p.id, name=p.name, slug=p.slug, description=p.description, is_public=p.is_public, user_id=p.user_id, username=current_user.username, created_at=p.created_at.isoformat(), performance_count=perf_count )) return result @router.get("/{playlist_id}", response_model=PlaylistDetailRead) def get_playlist( playlist_id: int, session: Session = Depends(get_session) ): """Get playlist details with performances""" playlist = session.get(UserPlaylist, playlist_id) if not playlist: raise HTTPException(status_code=404, detail="Playlist not found") if not playlist.is_public: raise HTTPException(status_code=403, detail="This playlist is private") user = session.get(User, playlist.user_id) # Get performances playlist_perfs = session.exec( select(PlaylistPerformance) .where(PlaylistPerformance.playlist_id == playlist_id) .order_by(PlaylistPerformance.position) ).all() performances = [] for pp in playlist_perfs: perf = session.get(Performance, pp.performance_id) if perf: song = session.get(Song, perf.song_id) if perf.song_id else None show = session.get(Show, perf.show_id) if perf.show_id else None performances.append(PerformanceInPlaylist( performance_id=pp.performance_id, position=pp.position, notes=pp.notes, song_title=song.title if song else "Unknown", show_date=show.date.strftime("%Y-%m-%d") if show and show.date else "Unknown", show_slug=show.slug if show else None )) return PlaylistDetailRead( id=playlist.id, name=playlist.name, slug=playlist.slug, description=playlist.description, is_public=playlist.is_public, user_id=playlist.user_id, username=user.username if user else None, created_at=playlist.created_at.isoformat(), performances=performances ) @router.post("/{playlist_id}/performances/{performance_id}") def add_to_playlist( playlist_id: int, performance_id: int, notes: Optional[str] = None, session: Session = Depends(get_session), current_user: User = Depends(get_current_user) ): """Add a performance to a playlist""" playlist = session.get(UserPlaylist, playlist_id) if not playlist: raise HTTPException(status_code=404, detail="Playlist not found") if playlist.user_id != current_user.id: raise HTTPException(status_code=403, detail="Not your playlist") # Check if already in playlist existing = session.exec( select(PlaylistPerformance) .where(PlaylistPerformance.playlist_id == playlist_id) .where(PlaylistPerformance.performance_id == performance_id) ).first() if existing: raise HTTPException(status_code=400, detail="Already in playlist") # Get next position all_perfs = session.exec( select(PlaylistPerformance).where(PlaylistPerformance.playlist_id == playlist_id) ).all() next_position = len(all_perfs) + 1 pp = PlaylistPerformance( playlist_id=playlist_id, performance_id=performance_id, position=next_position, notes=notes ) session.add(pp) session.commit() return {"message": "Added to playlist", "position": next_position} @router.delete("/{playlist_id}/performances/{performance_id}") def remove_from_playlist( playlist_id: int, performance_id: int, session: Session = Depends(get_session), current_user: User = Depends(get_current_user) ): """Remove a performance from a playlist""" playlist = session.get(UserPlaylist, playlist_id) if not playlist: raise HTTPException(status_code=404, detail="Playlist not found") if playlist.user_id != current_user.id: raise HTTPException(status_code=403, detail="Not your playlist") pp = session.exec( select(PlaylistPerformance) .where(PlaylistPerformance.playlist_id == playlist_id) .where(PlaylistPerformance.performance_id == performance_id) ).first() if not pp: raise HTTPException(status_code=404, detail="Not in playlist") session.delete(pp) session.commit() return {"message": "Removed from playlist"} @router.delete("/{playlist_id}") def delete_playlist( playlist_id: int, session: Session = Depends(get_session), current_user: User = Depends(get_current_user) ): """Delete a playlist""" playlist = session.get(UserPlaylist, playlist_id) if not playlist: raise HTTPException(status_code=404, detail="Playlist not found") if playlist.user_id != current_user.id: raise HTTPException(status_code=403, detail="Not your playlist") # Delete all playlist performances first perfs = session.exec( select(PlaylistPerformance).where(PlaylistPerformance.playlist_id == playlist_id) ).all() for p in perfs: session.delete(p) session.delete(playlist) session.commit() return {"message": "Playlist deleted"}