fediversion/backend/routers/playlists.py
fullsizemalt b38da24055
Some checks failed
Deploy Fediversion / deploy (push) Failing after 1s
feat: Cross-band milestone - Festivals, Playlists, Musicians, Venue Timeline
Sprint 2: Added 54 musicians with 78 band memberships
- Phish, Widespread Panic, Umphreys McGee core members
- Notable sit-in artists (Karl Denson, Branford Marsalis, Derek/Susan Trucks)
- Toy Factory Project supergroup (Oteil, Marcus King, Charlie Starr)

Sprint 4: Festival entity for multi-band events
- Festival and ShowFestival models
- /festivals API with list, detail, by-band endpoints

Sprint 5: User Playlists for curated collections
- UserPlaylist and PlaylistPerformance models
- Full CRUD /playlists API

Sprint 6: Venue Timeline endpoint
- /venues/{slug}/timeline for chronological cross-band history

Blockers (need production data):
- Venue linking script (no venues in local DB)
- Canon song linking (no songs in local DB)
2025-12-28 23:34:05 -08:00

317 lines
9.5 KiB
Python

"""
User Playlist API - curated collections of performances.
"""
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import Session, select
from pydantic import BaseModel
from datetime import datetime
from database import get_session
from models import UserPlaylist, PlaylistPerformance, Performance, Show, Song, User
from auth import get_current_user
from slugify import generate_slug
router = APIRouter(prefix="/playlists", tags=["playlists"])
class PlaylistCreate(BaseModel):
name: str
description: Optional[str] = None
is_public: bool = True
class PlaylistUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
is_public: Optional[bool] = None
class PerformanceInPlaylist(BaseModel):
performance_id: int
position: int
notes: Optional[str] = None
song_title: str
show_date: str
show_slug: Optional[str]
class PlaylistRead(BaseModel):
id: int
name: str
slug: str
description: Optional[str]
is_public: bool
user_id: int
username: Optional[str]
created_at: str
performance_count: int
class PlaylistDetailRead(BaseModel):
id: int
name: str
slug: str
description: Optional[str]
is_public: bool
user_id: int
username: Optional[str]
created_at: str
performances: List[PerformanceInPlaylist]
@router.post("/", response_model=PlaylistRead)
def create_playlist(
playlist: PlaylistCreate,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
):
"""Create a new playlist"""
slug = generate_slug(playlist.name)
# Make slug unique per user
existing = session.exec(
select(UserPlaylist)
.where(UserPlaylist.user_id == current_user.id)
.where(UserPlaylist.slug == slug)
).first()
if existing:
slug = f"{slug}-{int(datetime.utcnow().timestamp())}"
db_playlist = UserPlaylist(
user_id=current_user.id,
name=playlist.name,
slug=slug,
description=playlist.description,
is_public=playlist.is_public
)
session.add(db_playlist)
session.commit()
session.refresh(db_playlist)
return PlaylistRead(
id=db_playlist.id,
name=db_playlist.name,
slug=db_playlist.slug,
description=db_playlist.description,
is_public=db_playlist.is_public,
user_id=db_playlist.user_id,
username=current_user.username,
created_at=db_playlist.created_at.isoformat(),
performance_count=0
)
@router.get("/", response_model=List[PlaylistRead])
def list_playlists(
user_id: Optional[int] = None,
limit: int = Query(default=20, le=100),
offset: int = 0,
session: Session = Depends(get_session)
):
"""List public playlists, optionally filtered by user"""
query = select(UserPlaylist).where(UserPlaylist.is_public == True)
if user_id:
query = query.where(UserPlaylist.user_id == user_id)
query = query.order_by(UserPlaylist.created_at.desc()).offset(offset).limit(limit)
playlists = session.exec(query).all()
result = []
for p in playlists:
user = session.get(User, p.user_id)
perf_count = len(session.exec(
select(PlaylistPerformance).where(PlaylistPerformance.playlist_id == p.id)
).all())
result.append(PlaylistRead(
id=p.id,
name=p.name,
slug=p.slug,
description=p.description,
is_public=p.is_public,
user_id=p.user_id,
username=user.username if user else None,
created_at=p.created_at.isoformat(),
performance_count=perf_count
))
return result
@router.get("/mine", response_model=List[PlaylistRead])
def list_my_playlists(
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
):
"""List current user's playlists (public and private)"""
playlists = session.exec(
select(UserPlaylist)
.where(UserPlaylist.user_id == current_user.id)
.order_by(UserPlaylist.created_at.desc())
).all()
result = []
for p in playlists:
perf_count = len(session.exec(
select(PlaylistPerformance).where(PlaylistPerformance.playlist_id == p.id)
).all())
result.append(PlaylistRead(
id=p.id,
name=p.name,
slug=p.slug,
description=p.description,
is_public=p.is_public,
user_id=p.user_id,
username=current_user.username,
created_at=p.created_at.isoformat(),
performance_count=perf_count
))
return result
@router.get("/{playlist_id}", response_model=PlaylistDetailRead)
def get_playlist(
playlist_id: int,
session: Session = Depends(get_session)
):
"""Get playlist details with performances"""
playlist = session.get(UserPlaylist, playlist_id)
if not playlist:
raise HTTPException(status_code=404, detail="Playlist not found")
if not playlist.is_public:
raise HTTPException(status_code=403, detail="This playlist is private")
user = session.get(User, playlist.user_id)
# Get performances
playlist_perfs = session.exec(
select(PlaylistPerformance)
.where(PlaylistPerformance.playlist_id == playlist_id)
.order_by(PlaylistPerformance.position)
).all()
performances = []
for pp in playlist_perfs:
perf = session.get(Performance, pp.performance_id)
if perf:
song = session.get(Song, perf.song_id) if perf.song_id else None
show = session.get(Show, perf.show_id) if perf.show_id else None
performances.append(PerformanceInPlaylist(
performance_id=pp.performance_id,
position=pp.position,
notes=pp.notes,
song_title=song.title if song else "Unknown",
show_date=show.date.strftime("%Y-%m-%d") if show and show.date else "Unknown",
show_slug=show.slug if show else None
))
return PlaylistDetailRead(
id=playlist.id,
name=playlist.name,
slug=playlist.slug,
description=playlist.description,
is_public=playlist.is_public,
user_id=playlist.user_id,
username=user.username if user else None,
created_at=playlist.created_at.isoformat(),
performances=performances
)
@router.post("/{playlist_id}/performances/{performance_id}")
def add_to_playlist(
playlist_id: int,
performance_id: int,
notes: Optional[str] = None,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
):
"""Add a performance to a playlist"""
playlist = session.get(UserPlaylist, playlist_id)
if not playlist:
raise HTTPException(status_code=404, detail="Playlist not found")
if playlist.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Not your playlist")
# Check if already in playlist
existing = session.exec(
select(PlaylistPerformance)
.where(PlaylistPerformance.playlist_id == playlist_id)
.where(PlaylistPerformance.performance_id == performance_id)
).first()
if existing:
raise HTTPException(status_code=400, detail="Already in playlist")
# Get next position
all_perfs = session.exec(
select(PlaylistPerformance).where(PlaylistPerformance.playlist_id == playlist_id)
).all()
next_position = len(all_perfs) + 1
pp = PlaylistPerformance(
playlist_id=playlist_id,
performance_id=performance_id,
position=next_position,
notes=notes
)
session.add(pp)
session.commit()
return {"message": "Added to playlist", "position": next_position}
@router.delete("/{playlist_id}/performances/{performance_id}")
def remove_from_playlist(
playlist_id: int,
performance_id: int,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
):
"""Remove a performance from a playlist"""
playlist = session.get(UserPlaylist, playlist_id)
if not playlist:
raise HTTPException(status_code=404, detail="Playlist not found")
if playlist.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Not your playlist")
pp = session.exec(
select(PlaylistPerformance)
.where(PlaylistPerformance.playlist_id == playlist_id)
.where(PlaylistPerformance.performance_id == performance_id)
).first()
if not pp:
raise HTTPException(status_code=404, detail="Not in playlist")
session.delete(pp)
session.commit()
return {"message": "Removed from playlist"}
@router.delete("/{playlist_id}")
def delete_playlist(
playlist_id: int,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
):
"""Delete a playlist"""
playlist = session.get(UserPlaylist, playlist_id)
if not playlist:
raise HTTPException(status_code=404, detail="Playlist not found")
if playlist.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Not your playlist")
# Delete all playlist performances first
perfs = session.exec(
select(PlaylistPerformance).where(PlaylistPerformance.playlist_id == playlist_id)
).all()
for p in perfs:
session.delete(p)
session.delete(playlist)
session.commit()
return {"message": "Playlist deleted"}