323 lines
11 KiB
Python
323 lines
11 KiB
Python
from typing import List
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlmodel import Session, select
|
|
from sqlalchemy import func
|
|
from database import get_session
|
|
from models import Show, Tag, EntityTag, Vertical, UserVerticalPreference
|
|
from schemas import ShowCreate, ShowRead, ShowUpdate, TagRead, PaginatedResponse, PaginationMeta, VerticalSimple, VenueRead, TourRead
|
|
from auth import get_current_user, get_current_user_optional
|
|
|
|
router = APIRouter(prefix="/shows", tags=["shows"])
|
|
|
|
from services.notification_service import NotificationService
|
|
|
|
def get_notification_service(session: Session = Depends(get_session)) -> NotificationService:
|
|
return NotificationService(session)
|
|
|
|
@router.post("/", response_model=ShowRead)
|
|
def create_show(
|
|
show: ShowCreate,
|
|
session: Session = Depends(get_session),
|
|
current_user = Depends(get_current_user),
|
|
notification_service: NotificationService = Depends(get_notification_service)
|
|
):
|
|
db_show = Show.model_validate(show)
|
|
session.add(db_show)
|
|
session.commit()
|
|
session.refresh(db_show)
|
|
|
|
# Trigger notifications
|
|
try:
|
|
notification_service.check_show_alert(db_show)
|
|
except Exception as e:
|
|
print(f"Error sending notifications: {e}")
|
|
|
|
return db_show
|
|
|
|
def serialize_show(show: Show) -> dict:
|
|
"""
|
|
Robustly serialize a Show object to a dictionary.
|
|
Returning a dict breaks the link to SQLAlchemy ORM objects completely,
|
|
preventing Pydantic from triggering lazy loads or infinite recursion during validation.
|
|
"""
|
|
try:
|
|
# Base fields
|
|
data = {
|
|
"id": show.id,
|
|
"date": show.date,
|
|
"slug": show.slug,
|
|
"vertical_id": show.vertical_id,
|
|
"venue_id": show.venue_id,
|
|
"tour_id": show.tour_id,
|
|
"notes": show.notes,
|
|
"bandcamp_link": show.bandcamp_link,
|
|
"nugs_link": show.nugs_link,
|
|
"youtube_link": show.youtube_link,
|
|
"vertical": None,
|
|
"venue": None,
|
|
"tour": None,
|
|
"tags": [],
|
|
"performances": []
|
|
}
|
|
|
|
# Manually map relationships if present
|
|
if show.vertical:
|
|
try:
|
|
data["vertical"] = {
|
|
"id": show.vertical.id,
|
|
"name": show.vertical.name,
|
|
"slug": show.vertical.slug,
|
|
"description": show.vertical.description,
|
|
"logo_url": show.vertical.logo_url,
|
|
"accent_color": show.vertical.accent_color
|
|
}
|
|
except Exception as e:
|
|
print(f"Error serializing vertical for show {show.id}: {e}")
|
|
|
|
if show.venue:
|
|
try:
|
|
data["venue"] = {
|
|
"id": show.venue.id,
|
|
"name": show.venue.name,
|
|
"slug": show.venue.slug,
|
|
"city": show.venue.city,
|
|
"state": show.venue.state,
|
|
"country": show.venue.country,
|
|
"capacity": show.venue.capacity,
|
|
"notes": show.venue.notes
|
|
}
|
|
except Exception as e:
|
|
print(f"Error serializing venue for show {show.id}: {e}")
|
|
|
|
if show.tour:
|
|
try:
|
|
data["tour"] = {
|
|
"id": show.tour.id,
|
|
"name": show.tour.name,
|
|
"slug": show.tour.slug,
|
|
"start_date": show.tour.start_date,
|
|
"end_date": show.tour.end_date,
|
|
"notes": show.tour.notes
|
|
}
|
|
except Exception as e:
|
|
print(f"Error serializing tour for show {show.id}: {e}")
|
|
|
|
return data
|
|
|
|
except Exception as e:
|
|
print(f"CRITICAL Error serializing show {show.id}: {e}")
|
|
# Return minimal valid dict
|
|
return {
|
|
"id": show.id,
|
|
"date": show.date,
|
|
"vertical_id": show.vertical_id,
|
|
"slug": show.slug or "",
|
|
"tags": [],
|
|
"performances": []
|
|
}
|
|
|
|
@router.get("/", response_model=PaginatedResponse[ShowRead])
|
|
def read_shows(
|
|
offset: int = 0,
|
|
limit: int = Query(default=2000, le=5000),
|
|
venue_id: int = None,
|
|
tour_id: int = None,
|
|
year: int = None,
|
|
vertical: str = None, # Single vertical slug filter
|
|
vertical_id: int = None, # Vertical ID filter
|
|
vertical_slugs: List[str] = Query(None),
|
|
status: str = Query(default=None, regex="^(past|upcoming)$"),
|
|
tiers: List[str] = Query(None),
|
|
current_user = Depends(get_current_user_optional),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
from sqlalchemy.orm import joinedload
|
|
from datetime import datetime
|
|
|
|
query = select(Show).options(
|
|
joinedload(Show.vertical),
|
|
joinedload(Show.venue),
|
|
joinedload(Show.tour)
|
|
)
|
|
|
|
if tiers and current_user:
|
|
prefs = session.exec(
|
|
select(UserVerticalPreference)
|
|
.where(UserVerticalPreference.user_id == current_user.id)
|
|
.where(UserVerticalPreference.tier.in_(tiers))
|
|
).all()
|
|
allowed_ids = [p.vertical_id for p in prefs]
|
|
# If user selected tiers but has no bands in them, return empty
|
|
if not allowed_ids:
|
|
return PaginatedResponse(data=[], meta=PaginationMeta(total=0, limit=limit, offset=offset))
|
|
query = query.where(Show.vertical_id.in_(allowed_ids))
|
|
elif tiers and not current_user:
|
|
# Anonymous users can't filter by personal tiers
|
|
return PaginatedResponse(data=[], meta=PaginationMeta(total=0, limit=limit, offset=offset))
|
|
|
|
if venue_id:
|
|
query = query.where(Show.venue_id == venue_id)
|
|
if tour_id:
|
|
query = query.where(Show.tour_id == tour_id)
|
|
if year:
|
|
from sqlalchemy import extract
|
|
query = query.where(extract('year', Show.date) == year)
|
|
|
|
if vertical_slugs:
|
|
query = query.join(Vertical).where(Vertical.slug.in_(vertical_slugs))
|
|
elif vertical:
|
|
# Single vertical slug filter
|
|
query = query.join(Vertical).where(Vertical.slug == vertical)
|
|
|
|
if vertical_id:
|
|
query = query.where(Show.vertical_id == vertical_id)
|
|
|
|
if status:
|
|
today = datetime.now()
|
|
if status == "past":
|
|
query = query.where(Show.date <= today)
|
|
elif status == "upcoming":
|
|
query = query.where(Show.date > today)
|
|
|
|
# Calculate total count before pagination
|
|
total = session.exec(select(func.count()).select_from(query.subquery())).one()
|
|
|
|
# Apply sorting and pagination
|
|
if status == "upcoming":
|
|
query = query.order_by(Show.date.asc())
|
|
else:
|
|
# Default sort by date descending so we get recent shows first
|
|
query = query.order_by(Show.date.desc())
|
|
|
|
shows = session.exec(query.offset(offset).limit(limit)).all()
|
|
|
|
# Serialize robustly
|
|
serialized_shows = [serialize_show(s) for s in shows]
|
|
|
|
return PaginatedResponse(
|
|
data=serialized_shows,
|
|
meta=PaginationMeta(total=total, limit=limit, offset=offset)
|
|
)
|
|
|
|
@router.get("/recent", response_model=List[ShowRead])
|
|
def read_recent_shows(
|
|
limit: int = Query(default=10, le=50),
|
|
tiers: List[str] = Query(None),
|
|
current_user = Depends(get_current_user_optional),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
"""Get the most recent shows ordered by date descending"""
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import joinedload
|
|
query = select(Show).options(
|
|
joinedload(Show.vertical),
|
|
joinedload(Show.venue),
|
|
joinedload(Show.tour)
|
|
).where(Show.date <= datetime.now())
|
|
|
|
if tiers and current_user:
|
|
prefs = session.exec(
|
|
select(UserVerticalPreference)
|
|
.where(UserVerticalPreference.user_id == current_user.id)
|
|
.where(UserVerticalPreference.tier.in_(tiers))
|
|
).all()
|
|
allowed_ids = [p.vertical_id for p in prefs]
|
|
if not allowed_ids:
|
|
return []
|
|
query = query.where(Show.vertical_id.in_(allowed_ids))
|
|
|
|
query = query.order_by(Show.date.desc()).limit(limit)
|
|
shows = session.exec(query).all()
|
|
return shows
|
|
|
|
@router.get("/upcoming", response_model=List[ShowRead])
|
|
def read_upcoming_shows(
|
|
limit: int = Query(default=50, le=100),
|
|
tiers: List[str] = Query(None),
|
|
current_user = Depends(get_current_user_optional),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
"""Get upcoming shows ordered by date ascending"""
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import joinedload
|
|
query = select(Show).options(
|
|
joinedload(Show.vertical),
|
|
joinedload(Show.venue),
|
|
joinedload(Show.tour)
|
|
).where(Show.date > datetime.now())
|
|
|
|
if tiers and current_user:
|
|
prefs = session.exec(
|
|
select(UserVerticalPreference)
|
|
.where(UserVerticalPreference.user_id == current_user.id)
|
|
.where(UserVerticalPreference.tier.in_(tiers))
|
|
).all()
|
|
allowed_ids = [p.vertical_id for p in prefs]
|
|
if not allowed_ids:
|
|
return []
|
|
query = query.where(Show.vertical_id.in_(allowed_ids))
|
|
|
|
query = query.order_by(Show.date.asc()).limit(limit)
|
|
shows = session.exec(query).all()
|
|
return shows
|
|
|
|
@router.get("/{slug}", response_model=ShowRead)
|
|
def read_show(slug: str, session: Session = Depends(get_session)):
|
|
from sqlalchemy.orm import selectinload, joinedload
|
|
from models import Performance, VideoPerformance, Video, VideoPlatform
|
|
|
|
# Eager load relationships clearly
|
|
show = session.exec(
|
|
select(Show)
|
|
.options(
|
|
selectinload(Show.performances).selectinload(Performance.video_links).joinedload(VideoPerformance.video)
|
|
)
|
|
.where(Show.slug == slug)
|
|
).first()
|
|
|
|
if not show:
|
|
raise HTTPException(status_code=404, detail="Show not found")
|
|
|
|
tags = session.exec(
|
|
select(Tag)
|
|
.join(EntityTag, Tag.id == EntityTag.tag_id)
|
|
.where(EntityTag.entity_type == "show")
|
|
.where(EntityTag.entity_id == show.id)
|
|
).all()
|
|
|
|
show_data = ShowRead.model_validate(show)
|
|
show_data.tags = tags
|
|
|
|
# Get vertical for band name
|
|
vertical = session.get(Vertical, show.vertical_id)
|
|
show_data.vertical = vertical
|
|
|
|
# Sort performances by position
|
|
sorted_perfs = sorted(show.performances, key=lambda p: p.position)
|
|
|
|
# Process performances: Filter nicknames and populate video links
|
|
for perf in sorted_perfs:
|
|
perf.nicknames = [n for n in perf.nicknames if n.status == "approved"]
|
|
|
|
# Backfill youtube_link from Video entity if not present
|
|
if not perf.youtube_link and perf.video_links:
|
|
for link in perf.video_links:
|
|
if link.video and link.video.platform == VideoPlatform.YOUTUBE:
|
|
perf.youtube_link = link.video.url
|
|
break
|
|
|
|
show_data.performances = sorted_perfs
|
|
|
|
return show_data
|
|
|
|
@router.patch("/{show_id}", response_model=ShowRead)
|
|
def update_show(show_id: int, show: ShowUpdate, session: Session = Depends(get_session), current_user = Depends(get_current_user)):
|
|
db_show = session.get(Show, show_id)
|
|
if not db_show:
|
|
raise HTTPException(status_code=404, detail="Show not found")
|
|
show_data = show.model_dump(exclude_unset=True)
|
|
db_show.sqlmodel_update(show_data)
|
|
session.add(db_show)
|
|
session.commit()
|
|
session.refresh(db_show)
|
|
return db_show
|