251 lines
8.9 KiB
Python
251 lines
8.9 KiB
Python
from typing import List
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlmodel import Session, select
|
|
from sqlalchemy import func
|
|
from database import get_session
|
|
from models import Show, Tag, EntityTag, Vertical, UserVerticalPreference
|
|
from schemas import ShowCreate, ShowRead, ShowUpdate, TagRead, PaginatedResponse, PaginationMeta
|
|
from auth import get_current_user, get_current_user_optional
|
|
|
|
router = APIRouter(prefix="/shows", tags=["shows"])
|
|
|
|
from services.notification_service import NotificationService
|
|
|
|
def get_notification_service(session: Session = Depends(get_session)) -> NotificationService:
|
|
return NotificationService(session)
|
|
|
|
@router.post("/", response_model=ShowRead)
|
|
def create_show(
|
|
show: ShowCreate,
|
|
session: Session = Depends(get_session),
|
|
current_user = Depends(get_current_user),
|
|
notification_service: NotificationService = Depends(get_notification_service)
|
|
):
|
|
db_show = Show.model_validate(show)
|
|
session.add(db_show)
|
|
session.commit()
|
|
session.refresh(db_show)
|
|
|
|
# Trigger notifications
|
|
try:
|
|
notification_service.check_show_alert(db_show)
|
|
except Exception as e:
|
|
print(f"Error sending notifications: {e}")
|
|
|
|
return db_show
|
|
|
|
@router.get("/", response_model=PaginatedResponse[ShowRead])
|
|
def read_shows(
|
|
offset: int = 0,
|
|
limit: int = Query(default=2000, le=5000),
|
|
venue_id: int = None,
|
|
tour_id: int = None,
|
|
year: int = None,
|
|
vertical: str = None, # Single vertical slug filter
|
|
vertical_id: int = None, # Vertical ID filter
|
|
vertical_slugs: List[str] = Query(None),
|
|
status: str = Query(default=None, regex="^(past|upcoming)$"),
|
|
tiers: List[str] = Query(None),
|
|
current_user = Depends(get_current_user_optional),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
from sqlalchemy.orm import joinedload
|
|
from datetime import datetime
|
|
|
|
query = select(Show).options(
|
|
joinedload(Show.vertical),
|
|
joinedload(Show.venue),
|
|
joinedload(Show.tour)
|
|
)
|
|
|
|
if tiers and current_user:
|
|
prefs = session.exec(
|
|
select(UserVerticalPreference)
|
|
.where(UserVerticalPreference.user_id == current_user.id)
|
|
.where(UserVerticalPreference.tier.in_(tiers))
|
|
).all()
|
|
allowed_ids = [p.vertical_id for p in prefs]
|
|
# If user selected tiers but has no bands in them, return empty
|
|
if not allowed_ids:
|
|
return PaginatedResponse(data=[], meta=PaginationMeta(total=0, limit=limit, offset=offset))
|
|
query = query.where(Show.vertical_id.in_(allowed_ids))
|
|
elif tiers and not current_user:
|
|
# Anonymous users can't filter by personal tiers
|
|
return PaginatedResponse(data=[], meta=PaginationMeta(total=0, limit=limit, offset=offset))
|
|
|
|
if venue_id:
|
|
query = query.where(Show.venue_id == venue_id)
|
|
if tour_id:
|
|
query = query.where(Show.tour_id == tour_id)
|
|
if year:
|
|
from sqlalchemy import extract
|
|
query = query.where(extract('year', Show.date) == year)
|
|
|
|
if vertical_slugs:
|
|
query = query.join(Vertical).where(Vertical.slug.in_(vertical_slugs))
|
|
elif vertical:
|
|
# Single vertical slug filter
|
|
query = query.join(Vertical).where(Vertical.slug == vertical)
|
|
|
|
if vertical_id:
|
|
query = query.where(Show.vertical_id == vertical_id)
|
|
|
|
if status:
|
|
today = datetime.now()
|
|
if status == "past":
|
|
query = query.where(Show.date <= today)
|
|
elif status == "upcoming":
|
|
query = query.where(Show.date > today)
|
|
|
|
# Calculate total count before pagination
|
|
total = session.exec(select(func.count()).select_from(query.subquery())).one()
|
|
|
|
# Apply sorting and pagination
|
|
if status == "upcoming":
|
|
query = query.order_by(Show.date.asc())
|
|
else:
|
|
# Default sort by date descending so we get recent shows first
|
|
query = query.order_by(Show.date.desc())
|
|
|
|
shows = session.exec(query.offset(offset).limit(limit)).all()
|
|
|
|
if shows:
|
|
print(f"DEBUG: Loaded {len(shows)} shows")
|
|
print(f"DEBUG: First show ID: {shows[0].id}")
|
|
print(f"DEBUG: First show Venue: {shows[0].venue}")
|
|
print(f"DEBUG: First show Vertical: {shows[0].vertical}")
|
|
try:
|
|
dump = ShowRead.model_validate(shows[0]).model_dump()
|
|
print(f"DEBUG: Serialized Dump: {dump}")
|
|
except Exception as e:
|
|
print(f"DEBUG: Serialization Error: {e}")
|
|
|
|
|
|
|
|
return PaginatedResponse(
|
|
data=shows,
|
|
meta=PaginationMeta(total=total, limit=limit, offset=offset)
|
|
)
|
|
|
|
@router.get("/recent", response_model=List[ShowRead])
|
|
def read_recent_shows(
|
|
limit: int = Query(default=10, le=50),
|
|
tiers: List[str] = Query(None),
|
|
current_user = Depends(get_current_user_optional),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
"""Get the most recent shows ordered by date descending"""
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import joinedload
|
|
query = select(Show).options(
|
|
joinedload(Show.vertical),
|
|
joinedload(Show.venue),
|
|
joinedload(Show.tour)
|
|
).where(Show.date <= datetime.now())
|
|
|
|
if tiers and current_user:
|
|
prefs = session.exec(
|
|
select(UserVerticalPreference)
|
|
.where(UserVerticalPreference.user_id == current_user.id)
|
|
.where(UserVerticalPreference.tier.in_(tiers))
|
|
).all()
|
|
allowed_ids = [p.vertical_id for p in prefs]
|
|
if not allowed_ids:
|
|
return []
|
|
query = query.where(Show.vertical_id.in_(allowed_ids))
|
|
|
|
query = query.order_by(Show.date.desc()).limit(limit)
|
|
shows = session.exec(query).all()
|
|
return shows
|
|
|
|
@router.get("/upcoming", response_model=List[ShowRead])
|
|
def read_upcoming_shows(
|
|
limit: int = Query(default=50, le=100),
|
|
tiers: List[str] = Query(None),
|
|
current_user = Depends(get_current_user_optional),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
"""Get upcoming shows ordered by date ascending"""
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import joinedload
|
|
query = select(Show).options(
|
|
joinedload(Show.vertical),
|
|
joinedload(Show.venue),
|
|
joinedload(Show.tour)
|
|
).where(Show.date > datetime.now())
|
|
|
|
if tiers and current_user:
|
|
prefs = session.exec(
|
|
select(UserVerticalPreference)
|
|
.where(UserVerticalPreference.user_id == current_user.id)
|
|
.where(UserVerticalPreference.tier.in_(tiers))
|
|
).all()
|
|
allowed_ids = [p.vertical_id for p in prefs]
|
|
if not allowed_ids:
|
|
return []
|
|
query = query.where(Show.vertical_id.in_(allowed_ids))
|
|
|
|
query = query.order_by(Show.date.asc()).limit(limit)
|
|
shows = session.exec(query).all()
|
|
return shows
|
|
|
|
@router.get("/{slug}", response_model=ShowRead)
|
|
def read_show(slug: str, session: Session = Depends(get_session)):
|
|
from sqlalchemy.orm import selectinload, joinedload
|
|
from models import Performance, VideoPerformance, Video, VideoPlatform
|
|
|
|
# Eager load relationships clearly
|
|
show = session.exec(
|
|
select(Show)
|
|
.options(
|
|
selectinload(Show.performances).selectinload(Performance.video_links).joinedload(VideoPerformance.video)
|
|
)
|
|
.where(Show.slug == slug)
|
|
).first()
|
|
|
|
if not show:
|
|
raise HTTPException(status_code=404, detail="Show not found")
|
|
|
|
tags = session.exec(
|
|
select(Tag)
|
|
.join(EntityTag, Tag.id == EntityTag.tag_id)
|
|
.where(EntityTag.entity_type == "show")
|
|
.where(EntityTag.entity_id == show.id)
|
|
).all()
|
|
|
|
show_data = ShowRead.model_validate(show)
|
|
show_data.tags = tags
|
|
|
|
# Get vertical for band name
|
|
vertical = session.get(Vertical, show.vertical_id)
|
|
show_data.vertical = vertical
|
|
|
|
# Sort performances by position
|
|
sorted_perfs = sorted(show.performances, key=lambda p: p.position)
|
|
|
|
# Process performances: Filter nicknames and populate video links
|
|
for perf in sorted_perfs:
|
|
perf.nicknames = [n for n in perf.nicknames if n.status == "approved"]
|
|
|
|
# Backfill youtube_link from Video entity if not present
|
|
if not perf.youtube_link and perf.video_links:
|
|
for link in perf.video_links:
|
|
if link.video and link.video.platform == VideoPlatform.YOUTUBE:
|
|
perf.youtube_link = link.video.url
|
|
break
|
|
|
|
show_data.performances = sorted_perfs
|
|
|
|
return show_data
|
|
|
|
@router.patch("/{show_id}", response_model=ShowRead)
|
|
def update_show(show_id: int, show: ShowUpdate, session: Session = Depends(get_session), current_user = Depends(get_current_user)):
|
|
db_show = session.get(Show, show_id)
|
|
if not db_show:
|
|
raise HTTPException(status_code=404, detail="Show not found")
|
|
show_data = show.model_dump(exclude_unset=True)
|
|
db_show.sqlmodel_update(show_data)
|
|
session.add(db_show)
|
|
session.commit()
|
|
session.refresh(db_show)
|
|
return db_show
|