from typing import List from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, Query from sqlmodel import Session, select, func from database import get_session from models import Song, User, Tag, EntityTag, Show, Performance, Rating from schemas import SongCreate, SongRead, SongReadWithStats, SongUpdate, TagRead, PerformanceReadWithShow, PaginatedResponse, PaginationMeta from auth import get_current_user from services.stats import get_song_stats router = APIRouter(prefix="/songs", tags=["songs"]) @router.post("/", response_model=SongRead) def create_song(song: SongCreate, session: Session = Depends(get_session), current_user = Depends(get_current_user)): db_song = Song.model_validate(song) session.add(db_song) session.commit() session.refresh(db_song) return db_song @router.get("/", response_model=PaginatedResponse[SongRead]) def read_songs( offset: int = 0, limit: int = Query(default=100, le=1000), vertical: str = Query(default=None, description="Filter by vertical slug"), sort: str = Query(default=None, regex="^(times_played)$"), session: Session = Depends(get_session) ): query = select(Song) if vertical: from models import Vertical vertical_entity = session.exec(select(Vertical).where(Vertical.slug == vertical)).first() if vertical_entity: query = query.where(Song.vertical_id == vertical_entity.id) else: return PaginatedResponse(data=[], meta=PaginationMeta(total=0, limit=limit, offset=offset)) if sort == "times_played": query = query.outerjoin(Performance).group_by(Song.id) # Calculate total count before pagination total = session.exec(select(func.count()).select_from(query.subquery())).one() if sort == "times_played": query = query.order_by(func.count(Performance.id).desc()) songs = session.exec(query.offset(offset).limit(limit)).all() return PaginatedResponse( data=songs, meta=PaginationMeta(total=total, limit=limit, offset=offset) ) @router.get("/{slug}", response_model=SongReadWithStats) def read_song(slug: str, session: Session = Depends(get_session)): song = session.exec(select(Song).where(Song.slug == slug)).first() if not song: raise HTTPException(status_code=404, detail="Song not found") song_id = song.id # Use actual ID for lookups stats = get_song_stats(session, song_id) tags = session.exec( select(Tag) .join(EntityTag, Tag.id == EntityTag.tag_id) .where(EntityTag.entity_type == "song") .where(EntityTag.entity_id == song_id) ).all() # Fetch performances with video links from sqlalchemy.orm import selectinload, joinedload from models import VideoPerformance from models import Video, VideoPlatform perfs = session.exec( select(Performance) .join(Show) .options( selectinload(Performance.video_links).joinedload(VideoPerformance.video) ) .where(Performance.song_id == song_id) .order_by(Show.date.desc()) ).all() # Calculate ratings perf_ids = [p.id for p in perfs] rating_stats = {} if perf_ids: results = session.exec( select(Rating.performance_id, func.avg(Rating.score), func.count(Rating.id)) .where(Rating.performance_id.in_(perf_ids)) .group_by(Rating.performance_id) ).all() for r in results: rating_stats[r[0]] = {"avg": float(r[1]) if r[1] else 0.0, "count": r[2]} perf_dtos = [] for p in perfs: # Lazy load show/venue (could be optimized) venue_name = "Unknown" venue_city = "" venue_state = "" show_date = datetime.now() show_slug = None if p.show: show_date = p.show.date show_slug = p.show.slug if p.show.venue: venue_name = p.show.venue.name venue_city = p.show.venue.city venue_state = p.show.venue.state r_stats = rating_stats.get(p.id, {"avg": 0.0, "count": 0}) # Backfill youtube_link youtube_link = p.youtube_link if not youtube_link and p.video_links: for link in p.video_links: if link.video and link.video.platform == VideoPlatform.YOUTUBE: youtube_link = link.video.url break perf_dtos.append(PerformanceReadWithShow( **p.model_dump(), youtube_link=youtube_link, show_date=show_date, show_slug=show_slug, venue_name=venue_name, venue_city=venue_city, venue_state=venue_state, avg_rating=r_stats["avg"], total_reviews=r_stats["count"] )) # Merge song data with stats song_with_stats = SongReadWithStats( **song.model_dump(), **stats ) song_with_stats.tags = tags song_with_stats.performances = perf_dtos return song_with_stats @router.patch("/{song_id}", response_model=SongRead) def update_song(song_id: int, song: SongUpdate, session: Session = Depends(get_session), current_user = Depends(get_current_user)): db_song = session.get(Song, song_id) if not db_song: raise HTTPException(status_code=404, detail="Song not found") song_data = song.model_dump(exclude_unset=True) db_song.sqlmodel_update(song_data) session.add(db_song) session.commit() session.refresh(db_song) return db_song @router.get("/{slug}/versions") def get_song_versions(slug: str, session: Session = Depends(get_session)): """Get all versions of a song across different bands (via SongCanon)""" from models import SongCanon, Vertical # Find the song by slug song = session.exec(select(Song).where(Song.slug == slug)).first() if not song: raise HTTPException(status_code=404, detail="Song not found") # If no canon link, return empty if not song.canon_id: return { "song": { "id": song.id, "title": song.title, "slug": song.slug, "vertical_id": song.vertical_id, }, "canon": None, "other_versions": [] } # Get the canon entry canon = session.get(SongCanon, song.canon_id) # Get all other versions (same canon, different song) other_songs = session.exec( select(Song) .where(Song.canon_id == song.canon_id) .where(Song.id != song.id) ).all() other_versions = [] for s in other_songs: vertical = session.get(Vertical, s.vertical_id) # Get play count for this version play_count = session.exec( select(func.count(Performance.id)) .where(Performance.song_id == s.id) ).one() other_versions.append({ "id": s.id, "title": s.title, "slug": s.slug, "vertical_id": s.vertical_id, "vertical_name": vertical.name if vertical else "Unknown", "vertical_slug": vertical.slug if vertical else "unknown", "play_count": play_count, }) return { "song": { "id": song.id, "title": song.title, "slug": song.slug, "vertical_id": song.vertical_id, }, "canon": { "id": canon.id, "title": canon.title, "slug": canon.slug, "original_artist": canon.original_artist, } if canon else None, "other_versions": other_versions }