""" Analytics API - Charts, Trends, Velocity, Gap Analysis. Deep insights into song performance patterns and band statistics. """ from typing import List, Optional from datetime import datetime, timedelta from fastapi import APIRouter, Depends, HTTPException, Query from sqlmodel import Session, select, func, desc from pydantic import BaseModel from database import get_session from models import Song, Show, Performance, Vertical router = APIRouter(prefix="/analytics", tags=["analytics"]) class SongGapAnalysis(BaseModel): """Gap analysis for a song - days since last played""" song_id: int song_title: str song_slug: str last_played: Optional[str] days_since_played: Optional[int] total_plays: int average_gap_days: Optional[float] class SongTrend(BaseModel): """Play count trend over time periods""" period: str # "2024-Q1", "2024-06", etc. play_count: int class SongVelocity(BaseModel): """Song velocity - frequency and recency metrics""" song_id: int song_title: str song_slug: str plays_last_30_days: int plays_last_90_days: int plays_last_year: int total_plays: int velocity_score: float # Higher = more frequently played recently class BandStats(BaseModel): """Aggregate statistics for a band""" vertical_id: int vertical_name: str vertical_slug: str total_shows: int total_songs: int total_performances: int unique_songs_played: int avg_songs_per_show: float first_show: Optional[str] last_show: Optional[str] class MonthlyActivity(BaseModel): """Monthly show/performance counts""" month: str show_count: int performance_count: int @router.get("/gaps/{vertical_slug}", response_model=List[SongGapAnalysis]) def get_song_gaps( vertical_slug: str, min_plays: int = Query(default=5, description="Minimum plays to include"), limit: int = Query(default=50, le=200), session: Session = Depends(get_session) ): """ Get gap analysis for songs - how long since each song was last played. Useful for identifying songs that are "due" to be played. """ vertical = session.exec( select(Vertical).where(Vertical.slug == vertical_slug) ).first() if not vertical: raise HTTPException(status_code=404, detail="Band not found") # Get all songs for this vertical with play counts songs = session.exec( select(Song).where(Song.vertical_id == vertical.id) ).all() today = datetime.now().date() results = [] for song in songs: # Get performances for this song performances = session.exec( select(Performance) .join(Show) .where(Performance.song_id == song.id) .where(Show.date.isnot(None)) .order_by(Show.date.desc()) ).all() if len(performances) < min_plays: continue # Get show dates for gap calculation show_dates = [] for perf in performances: show = session.get(Show, perf.show_id) if show and show.date: show_dates.append(show.date.date() if hasattr(show.date, 'date') else show.date) if not show_dates: continue show_dates.sort(reverse=True) last_played = show_dates[0] days_since = (today - last_played).days # Calculate average gap between plays avg_gap = None if len(show_dates) > 1: gaps = [(show_dates[i] - show_dates[i+1]).days for i in range(len(show_dates)-1)] avg_gap = sum(gaps) / len(gaps) results.append(SongGapAnalysis( song_id=song.id, song_title=song.title, song_slug=song.slug or "", last_played=last_played.strftime("%Y-%m-%d"), days_since_played=days_since, total_plays=len(performances), average_gap_days=round(avg_gap, 1) if avg_gap else None )) # Sort by days since played (longest gaps first) results.sort(key=lambda x: x.days_since_played or 0, reverse=True) return results[:limit] @router.get("/velocity/{vertical_slug}", response_model=List[SongVelocity]) def get_song_velocity( vertical_slug: str, limit: int = Query(default=50, le=200), session: Session = Depends(get_session) ): """ Get song velocity - which songs are hot right now vs cooling down. Higher velocity score = more frequently played recently. """ vertical = session.exec( select(Vertical).where(Vertical.slug == vertical_slug) ).first() if not vertical: raise HTTPException(status_code=404, detail="Band not found") today = datetime.now() thirty_days_ago = today - timedelta(days=30) ninety_days_ago = today - timedelta(days=90) one_year_ago = today - timedelta(days=365) songs = session.exec( select(Song).where(Song.vertical_id == vertical.id) ).all() results = [] for song in songs: # Get all performances with show dates performances = session.exec( select(Performance, Show) .join(Show) .where(Performance.song_id == song.id) .where(Show.date.isnot(None)) ).all() if not performances: continue plays_30 = 0 plays_90 = 0 plays_year = 0 total = len(performances) for perf, show in performances: if show.date >= thirty_days_ago: plays_30 += 1 if show.date >= ninety_days_ago: plays_90 += 1 if show.date >= one_year_ago: plays_year += 1 # Velocity score: weighted recent plays (30d = 3x, 90d = 2x, year = 1x) velocity = (plays_30 * 3) + (plays_90 * 2) + plays_year results.append(SongVelocity( song_id=song.id, song_title=song.title, song_slug=song.slug or "", plays_last_30_days=plays_30, plays_last_90_days=plays_90, plays_last_year=plays_year, total_plays=total, velocity_score=velocity )) # Sort by velocity (hottest songs first) results.sort(key=lambda x: x.velocity_score, reverse=True) return results[:limit] @router.get("/trends/{vertical_slug}") def get_show_trends( vertical_slug: str, period: str = Query(default="month", description="month or quarter"), session: Session = Depends(get_session) ): """ Get show activity trends over time - monthly or quarterly aggregates. """ vertical = session.exec( select(Vertical).where(Vertical.slug == vertical_slug) ).first() if not vertical: raise HTTPException(status_code=404, detail="Band not found") shows = session.exec( select(Show) .where(Show.vertical_id == vertical.id) .where(Show.date.isnot(None)) .order_by(Show.date) ).all() # Group by period trends = {} for show in shows: if period == "quarter": q = (show.date.month - 1) // 3 + 1 key = f"{show.date.year}-Q{q}" else: key = show.date.strftime("%Y-%m") if key not in trends: trends[key] = {"shows": 0, "performances": 0} trends[key]["shows"] += 1 # Count performances in this show perf_count = len(session.exec( select(Performance).where(Performance.show_id == show.id) ).all()) trends[key]["performances"] += perf_count return { "vertical": vertical.name, "period_type": period, "trends": [ {"period": k, "shows": v["shows"], "performances": v["performances"]} for k, v in sorted(trends.items()) ] } @router.get("/stats/{vertical_slug}", response_model=BandStats) def get_band_stats( vertical_slug: str, session: Session = Depends(get_session) ): """Get aggregate statistics for a band.""" vertical = session.exec( select(Vertical).where(Vertical.slug == vertical_slug) ).first() if not vertical: raise HTTPException(status_code=404, detail="Band not found") # Total shows shows = session.exec( select(Show) .where(Show.vertical_id == vertical.id) .order_by(Show.date) ).all() # Total unique songs songs = session.exec( select(Song).where(Song.vertical_id == vertical.id) ).all() # Total performances show_ids = [s.id for s in shows] total_perfs = 0 unique_songs_played = set() if show_ids: all_perfs = session.exec( select(Performance).where(Performance.show_id.in_(show_ids)) ).all() total_perfs = len(all_perfs) unique_songs_played = set(p.song_id for p in all_perfs if p.song_id) # Date range dated_shows = [s for s in shows if s.date] first_show = min(s.date for s in dated_shows).strftime("%Y-%m-%d") if dated_shows else None last_show = max(s.date for s in dated_shows).strftime("%Y-%m-%d") if dated_shows else None avg_songs = total_perfs / len(shows) if shows else 0 return BandStats( vertical_id=vertical.id, vertical_name=vertical.name, vertical_slug=vertical.slug, total_shows=len(shows), total_songs=len(songs), total_performances=total_perfs, unique_songs_played=len(unique_songs_played), avg_songs_per_show=round(avg_songs, 1), first_show=first_show, last_show=last_show ) @router.get("/bustouts/{vertical_slug}") def get_bustouts( vertical_slug: str, days: int = Query(default=365, description="Look back period in days"), gap_threshold: int = Query(default=180, description="Minimum gap days to count as bustout"), session: Session = Depends(get_session) ): """ Find bustouts - songs that returned after a long gap. """ vertical = session.exec( select(Vertical).where(Vertical.slug == vertical_slug) ).first() if not vertical: raise HTTPException(status_code=404, detail="Band not found") cutoff_date = datetime.now() - timedelta(days=days) songs = session.exec( select(Song).where(Song.vertical_id == vertical.id) ).all() bustouts = [] for song in songs: # Get performances ordered by date perfs_with_shows = session.exec( select(Performance, Show) .join(Show) .where(Performance.song_id == song.id) .where(Show.date.isnot(None)) .order_by(Show.date) ).all() if len(perfs_with_shows) < 2: continue # Look for gaps > threshold followed by a play in the period for i in range(1, len(perfs_with_shows)): prev_show = perfs_with_shows[i-1][1] curr_show = perfs_with_shows[i][1] gap = (curr_show.date - prev_show.date).days if gap >= gap_threshold and curr_show.date >= cutoff_date: bustouts.append({ "song_title": song.title, "song_slug": song.slug, "bustout_date": curr_show.date.strftime("%Y-%m-%d"), "show_slug": curr_show.slug, "gap_days": gap, "previous_play": prev_show.date.strftime("%Y-%m-%d") }) # Sort by gap (biggest bustouts first) bustouts.sort(key=lambda x: x["gap_days"], reverse=True) return {"vertical": vertical.name, "threshold_days": gap_threshold, "bustouts": bustouts} @router.get("/debut-songs/{vertical_slug}") def get_debut_songs( vertical_slug: str, days: int = Query(default=365, description="Look back period"), session: Session = Depends(get_session) ): """Find songs that debuted (first ever play) within the period.""" vertical = session.exec( select(Vertical).where(Vertical.slug == vertical_slug) ).first() if not vertical: raise HTTPException(status_code=404, detail="Band not found") cutoff_date = datetime.now() - timedelta(days=days) songs = session.exec( select(Song).where(Song.vertical_id == vertical.id) ).all() debuts = [] for song in songs: # Find first performance first_perf = session.exec( select(Performance, Show) .join(Show) .where(Performance.song_id == song.id) .where(Show.date.isnot(None)) .order_by(Show.date) ).first() if first_perf: perf, show = first_perf if show.date >= cutoff_date: # Count total plays total = len(session.exec( select(Performance).where(Performance.song_id == song.id) ).all()) debuts.append({ "song_title": song.title, "song_slug": song.slug, "debut_date": show.date.strftime("%Y-%m-%d"), "show_slug": show.slug, "times_played_since": total }) # Sort by debut date (newest first) debuts.sort(key=lambda x: x["debut_date"], reverse=True) return {"vertical": vertical.name, "period_days": days, "debuts": debuts}