- Fork elmeg-demo codebase for multi-band support - Add data importer infrastructure with base class - Create band-specific importers: - phish.py: Phish.net API v5 - grateful_dead.py: Grateful Stats API - setlistfm.py: Dead & Company, Billy Strings (Setlist.fm) - Add spec-kit configuration for Gemini - Update README with supported bands and architecture
213 lines
6.6 KiB
Python
213 lines
6.6 KiB
Python
"""
|
|
Sequences Router - API endpoints for managing song sequences.
|
|
"""
|
|
from typing import List, Optional
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlmodel import Session, select
|
|
from pydantic import BaseModel
|
|
from database import get_session
|
|
from models import Sequence, SequenceSong, Song
|
|
from slugify import generate_slug as slugify
|
|
from auth import get_current_user
|
|
from models import User
|
|
|
|
router = APIRouter(prefix="/sequences", tags=["sequences"])
|
|
|
|
# --- Schemas ---
|
|
class SequenceRead(BaseModel):
|
|
id: int
|
|
name: str
|
|
slug: str
|
|
description: Optional[str] = None
|
|
notes: Optional[str] = None
|
|
songs: List[dict] = [] # [{position, song_id, song_title}]
|
|
|
|
class SequenceCreate(BaseModel):
|
|
name: str
|
|
description: Optional[str] = None
|
|
notes: Optional[str] = None
|
|
song_ids: List[int] = [] # Ordered list of song IDs
|
|
|
|
class SequenceUpdate(BaseModel):
|
|
name: Optional[str] = None
|
|
description: Optional[str] = None
|
|
notes: Optional[str] = None
|
|
song_ids: Optional[List[int]] = None
|
|
|
|
# --- Public Endpoints ---
|
|
|
|
@router.get("", response_model=List[SequenceRead])
|
|
def list_sequences(
|
|
search: Optional[str] = None,
|
|
limit: int = 100,
|
|
session: Session = Depends(get_session)
|
|
):
|
|
"""List all sequences"""
|
|
query = select(Sequence)
|
|
if search:
|
|
query = query.where(Sequence.name.icontains(search))
|
|
query = query.order_by(Sequence.name).limit(limit)
|
|
sequences = session.exec(query).all()
|
|
|
|
result = []
|
|
for seq in sequences:
|
|
# Get songs for this sequence
|
|
songs_query = select(SequenceSong).where(SequenceSong.sequence_id == seq.id).order_by(SequenceSong.position)
|
|
seq_songs = session.exec(songs_query).all()
|
|
|
|
song_list = []
|
|
for ss in seq_songs:
|
|
song = session.get(Song, ss.song_id)
|
|
song_list.append({
|
|
"position": ss.position,
|
|
"song_id": ss.song_id,
|
|
"song_title": song.title if song else "Unknown"
|
|
})
|
|
|
|
result.append({
|
|
"id": seq.id,
|
|
"name": seq.name,
|
|
"slug": seq.slug,
|
|
"description": seq.description,
|
|
"notes": seq.notes,
|
|
"songs": song_list
|
|
})
|
|
|
|
return result
|
|
|
|
@router.get("/{slug}")
|
|
def get_sequence(slug: str, session: Session = Depends(get_session)):
|
|
"""Get sequence details with songs"""
|
|
sequence = session.exec(select(Sequence).where(Sequence.slug == slug)).first()
|
|
if not sequence:
|
|
raise HTTPException(status_code=404, detail="Sequence not found")
|
|
|
|
# Get songs
|
|
songs_query = select(SequenceSong).where(SequenceSong.sequence_id == sequence.id).order_by(SequenceSong.position)
|
|
seq_songs = session.exec(songs_query).all()
|
|
|
|
song_list = []
|
|
for ss in seq_songs:
|
|
song = session.get(Song, ss.song_id)
|
|
song_list.append({
|
|
"position": ss.position,
|
|
"song_id": ss.song_id,
|
|
"song_title": song.title if song else "Unknown",
|
|
"song_slug": song.slug if song else None
|
|
})
|
|
|
|
return {
|
|
"id": sequence.id,
|
|
"name": sequence.name,
|
|
"slug": sequence.slug,
|
|
"description": sequence.description,
|
|
"notes": sequence.notes,
|
|
"songs": song_list
|
|
}
|
|
|
|
# --- Admin Endpoints ---
|
|
|
|
@router.post("", response_model=SequenceRead)
|
|
def create_sequence(
|
|
data: SequenceCreate,
|
|
session: Session = Depends(get_session),
|
|
user: User = Depends(get_current_user)
|
|
):
|
|
"""Create a new sequence"""
|
|
if user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
|
|
slug = slugify(data.name)
|
|
|
|
# Check for existing
|
|
existing = session.exec(select(Sequence).where(Sequence.slug == slug)).first()
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Sequence with this name already exists")
|
|
|
|
sequence = Sequence(
|
|
name=data.name,
|
|
slug=slug,
|
|
description=data.description,
|
|
notes=data.notes
|
|
)
|
|
session.add(sequence)
|
|
session.commit()
|
|
session.refresh(sequence)
|
|
|
|
# Add songs
|
|
for i, song_id in enumerate(data.song_ids, start=1):
|
|
ss = SequenceSong(sequence_id=sequence.id, song_id=song_id, position=i)
|
|
session.add(ss)
|
|
session.commit()
|
|
|
|
return {
|
|
"id": sequence.id,
|
|
"name": sequence.name,
|
|
"slug": sequence.slug,
|
|
"description": sequence.description,
|
|
"notes": sequence.notes,
|
|
"songs": [{"position": i+1, "song_id": sid, "song_title": ""} for i, sid in enumerate(data.song_ids)]
|
|
}
|
|
|
|
@router.patch("/{sequence_id}")
|
|
def update_sequence(
|
|
sequence_id: int,
|
|
data: SequenceUpdate,
|
|
session: Session = Depends(get_session),
|
|
user: User = Depends(get_current_user)
|
|
):
|
|
"""Update a sequence"""
|
|
if user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
|
|
sequence = session.get(Sequence, sequence_id)
|
|
if not sequence:
|
|
raise HTTPException(status_code=404, detail="Sequence not found")
|
|
|
|
if data.name is not None:
|
|
sequence.name = data.name
|
|
sequence.slug = slugify(data.name)
|
|
if data.description is not None:
|
|
sequence.description = data.description
|
|
if data.notes is not None:
|
|
sequence.notes = data.notes
|
|
|
|
session.add(sequence)
|
|
|
|
# Update songs if provided
|
|
if data.song_ids is not None:
|
|
# Delete existing
|
|
existing = session.exec(select(SequenceSong).where(SequenceSong.sequence_id == sequence_id)).all()
|
|
for e in existing:
|
|
session.delete(e)
|
|
|
|
# Add new
|
|
for i, song_id in enumerate(data.song_ids, start=1):
|
|
ss = SequenceSong(sequence_id=sequence_id, song_id=song_id, position=i)
|
|
session.add(ss)
|
|
|
|
session.commit()
|
|
return {"message": "Sequence updated", "id": sequence_id}
|
|
|
|
@router.delete("/{sequence_id}")
|
|
def delete_sequence(
|
|
sequence_id: int,
|
|
session: Session = Depends(get_session),
|
|
user: User = Depends(get_current_user)
|
|
):
|
|
"""Delete a sequence"""
|
|
if user.role != "admin":
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
|
|
sequence = session.get(Sequence, sequence_id)
|
|
if not sequence:
|
|
raise HTTPException(status_code=404, detail="Sequence not found")
|
|
|
|
# Delete songs first
|
|
existing = session.exec(select(SequenceSong).where(SequenceSong.sequence_id == sequence_id)).all()
|
|
for e in existing:
|
|
session.delete(e)
|
|
|
|
session.delete(sequence)
|
|
session.commit()
|
|
return {"message": "Sequence deleted"}
|