246 lines
8.3 KiB
Python
246 lines
8.3 KiB
Python
from typing import List
|
|
from datetime import datetime
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlmodel import Session, select, func
|
|
from database import get_session
|
|
from models import Song, User, Tag, EntityTag, Show, Performance, Rating
|
|
from schemas import SongCreate, SongRead, SongReadWithStats, SongUpdate, TagRead, PerformanceReadWithShow, PaginatedResponse, PaginationMeta
|
|
from auth import get_current_user
|
|
from services.stats import get_song_stats
|
|
|
|
router = APIRouter(prefix="/songs", tags=["songs"])
|
|
|
|
@router.post("/", response_model=SongRead)
|
|
def create_song(song: SongCreate, session: Session = Depends(get_session), current_user = Depends(get_current_user)):
|
|
db_song = Song.model_validate(song)
|
|
session.add(db_song)
|
|
session.commit()
|
|
session.refresh(db_song)
|
|
return db_song
|
|
|
|
@router.get("/", response_model=PaginatedResponse[SongRead])
|
|
def read_songs(
|
|
offset: int = 0,
|
|
limit: int = Query(default=100, le=1000),
|
|
vertical: str = Query(default=None, description="Filter by vertical slug"),
|
|
sort: str = Query(default=None, regex="^(times_played)$"),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
query = select(Song)
|
|
|
|
if vertical:
|
|
from models import Vertical
|
|
vertical_entity = session.exec(select(Vertical).where(Vertical.slug == vertical)).first()
|
|
if vertical_entity:
|
|
query = query.where(Song.vertical_id == vertical_entity.id)
|
|
else:
|
|
return PaginatedResponse(data=[], meta=PaginationMeta(total=0, limit=limit, offset=offset))
|
|
|
|
if sort == "times_played":
|
|
query = query.outerjoin(Performance).group_by(Song.id)
|
|
|
|
# Calculate total count before pagination
|
|
total = session.exec(select(func.count()).select_from(query.subquery())).one()
|
|
|
|
if sort == "times_played":
|
|
query = query.order_by(func.count(Performance.id).desc())
|
|
|
|
songs = session.exec(query.offset(offset).limit(limit)).all()
|
|
|
|
return PaginatedResponse(
|
|
data=songs,
|
|
meta=PaginationMeta(total=total, limit=limit, offset=offset)
|
|
)
|
|
|
|
@router.get("/{slug}", response_model=SongReadWithStats)
|
|
def read_song(slug: str, session: Session = Depends(get_session)):
|
|
from sqlalchemy.orm import joinedload
|
|
song = session.exec(
|
|
select(Song)
|
|
.where(Song.slug == slug)
|
|
.options(
|
|
joinedload(Song.artist),
|
|
joinedload(Song.vertical)
|
|
)
|
|
).first()
|
|
|
|
if not song:
|
|
raise HTTPException(status_code=404, detail="Song not found")
|
|
|
|
song_id = song.id # Use actual ID for lookups
|
|
stats = get_song_stats(session, song_id)
|
|
|
|
tags = session.exec(
|
|
select(Tag)
|
|
.join(EntityTag, Tag.id == EntityTag.tag_id)
|
|
.where(EntityTag.entity_type == "song")
|
|
.where(EntityTag.entity_id == song_id)
|
|
).all()
|
|
|
|
# Fetch performances with video links
|
|
from sqlalchemy.orm import selectinload, joinedload
|
|
from models import VideoPerformance
|
|
from models import Video, VideoPlatform
|
|
|
|
perfs = session.exec(
|
|
select(Performance)
|
|
.join(Show)
|
|
.options(
|
|
selectinload(Performance.video_links).joinedload(VideoPerformance.video)
|
|
)
|
|
.where(Performance.song_id == song_id)
|
|
.order_by(Show.date.desc())
|
|
).all()
|
|
|
|
# Calculate ratings
|
|
perf_ids = [p.id for p in perfs]
|
|
rating_stats = {}
|
|
if perf_ids:
|
|
results = session.exec(
|
|
select(Rating.performance_id, func.avg(Rating.score), func.count(Rating.id))
|
|
.where(Rating.performance_id.in_(perf_ids))
|
|
.group_by(Rating.performance_id)
|
|
).all()
|
|
for r in results:
|
|
rating_stats[r[0]] = {"avg": float(r[1]) if r[1] else 0.0, "count": r[2]}
|
|
|
|
perf_dtos = []
|
|
for p in perfs:
|
|
# Lazy load show/venue (could be optimized)
|
|
venue_name = "Unknown"
|
|
venue_city = ""
|
|
venue_state = ""
|
|
artist_name = None
|
|
artist_slug = None
|
|
show_date = datetime.now()
|
|
show_slug = None
|
|
|
|
if p.show:
|
|
show_date = p.show.date
|
|
show_slug = p.show.slug
|
|
if p.show.venue:
|
|
venue_name = p.show.venue.name
|
|
venue_city = p.show.venue.city
|
|
venue_state = p.show.venue.state
|
|
if p.show.vertical:
|
|
artist_name = p.show.vertical.name
|
|
artist_slug = p.show.vertical.slug
|
|
|
|
r_stats = rating_stats.get(p.id, {"avg": 0.0, "count": 0})
|
|
|
|
# Backfill youtube_link
|
|
youtube_link = p.youtube_link
|
|
if not youtube_link and p.video_links:
|
|
for link in p.video_links:
|
|
if link.video and link.video.platform == VideoPlatform.YOUTUBE:
|
|
youtube_link = link.video.url
|
|
break
|
|
|
|
perf_dtos.append(PerformanceReadWithShow(
|
|
**p.model_dump(exclude={"youtube_link"}),
|
|
youtube_link=youtube_link,
|
|
show_date=show_date,
|
|
show_slug=show_slug,
|
|
venue_name=venue_name,
|
|
venue_city=venue_city,
|
|
venue_state=venue_state,
|
|
artist_name=artist_name,
|
|
artist_slug=artist_slug,
|
|
avg_rating=r_stats["avg"],
|
|
total_reviews=r_stats["count"]
|
|
))
|
|
|
|
# Calculate artist distribution
|
|
from collections import Counter
|
|
artist_dist = Counter(p.artist_name for p in perf_dtos if p.artist_name)
|
|
|
|
# Merge song data with stats
|
|
song_with_stats = SongReadWithStats(
|
|
**song.model_dump(),
|
|
**stats
|
|
)
|
|
song_with_stats.artist_distribution = artist_dist
|
|
song_with_stats.tags = tags
|
|
song_with_stats.performances = perf_dtos
|
|
return song_with_stats
|
|
|
|
@router.patch("/{song_id}", response_model=SongRead)
|
|
def update_song(song_id: int, song: SongUpdate, session: Session = Depends(get_session), current_user = Depends(get_current_user)):
|
|
db_song = session.get(Song, song_id)
|
|
if not db_song:
|
|
raise HTTPException(status_code=404, detail="Song not found")
|
|
song_data = song.model_dump(exclude_unset=True)
|
|
db_song.sqlmodel_update(song_data)
|
|
session.add(db_song)
|
|
session.commit()
|
|
session.refresh(db_song)
|
|
return db_song
|
|
|
|
|
|
@router.get("/{slug}/versions")
|
|
def get_song_versions(slug: str, session: Session = Depends(get_session)):
|
|
"""Get all versions of a song across different bands (via SongCanon)"""
|
|
from models import SongCanon, Vertical
|
|
|
|
# Find the song by slug
|
|
song = session.exec(select(Song).where(Song.slug == slug)).first()
|
|
if not song:
|
|
raise HTTPException(status_code=404, detail="Song not found")
|
|
|
|
# If no canon link, return empty
|
|
if not song.canon_id:
|
|
return {
|
|
"song": {
|
|
"id": song.id,
|
|
"title": song.title,
|
|
"slug": song.slug,
|
|
"vertical_id": song.vertical_id,
|
|
},
|
|
"canon": None,
|
|
"other_versions": []
|
|
}
|
|
|
|
# Get the canon entry
|
|
canon = session.get(SongCanon, song.canon_id)
|
|
|
|
# Get all other versions (same canon, different song)
|
|
other_songs = session.exec(
|
|
select(Song)
|
|
.where(Song.canon_id == song.canon_id)
|
|
.where(Song.id != song.id)
|
|
).all()
|
|
|
|
other_versions = []
|
|
for s in other_songs:
|
|
vertical = session.get(Vertical, s.vertical_id)
|
|
# Get play count for this version
|
|
play_count = session.exec(
|
|
select(func.count(Performance.id))
|
|
.where(Performance.song_id == s.id)
|
|
).one()
|
|
|
|
other_versions.append({
|
|
"id": s.id,
|
|
"title": s.title,
|
|
"slug": s.slug,
|
|
"vertical_id": s.vertical_id,
|
|
"vertical_name": vertical.name if vertical else "Unknown",
|
|
"vertical_slug": vertical.slug if vertical else "unknown",
|
|
"play_count": play_count,
|
|
})
|
|
|
|
return {
|
|
"song": {
|
|
"id": song.id,
|
|
"title": song.title,
|
|
"slug": song.slug,
|
|
"vertical_id": song.vertical_id,
|
|
},
|
|
"canon": {
|
|
"id": canon.id,
|
|
"title": canon.title,
|
|
"slug": canon.slug,
|
|
"original_artist": canon.original_artist,
|
|
} if canon else None,
|
|
"other_versions": other_versions
|
|
}
|