192 lines
7 KiB
Python
192 lines
7 KiB
Python
from typing import List
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlmodel import Session, select
|
|
from database import get_session
|
|
from models import Show, Tag, EntityTag, Vertical, UserVerticalPreference
|
|
from schemas import ShowCreate, ShowRead, ShowUpdate, TagRead
|
|
from auth import get_current_user, get_current_user_optional
|
|
|
|
router = APIRouter(prefix="/shows", tags=["shows"])
|
|
|
|
from services.notification_service import NotificationService
|
|
|
|
def get_notification_service(session: Session = Depends(get_session)) -> NotificationService:
|
|
return NotificationService(session)
|
|
|
|
@router.post("/", response_model=ShowRead)
|
|
def create_show(
|
|
show: ShowCreate,
|
|
session: Session = Depends(get_session),
|
|
current_user = Depends(get_current_user),
|
|
notification_service: NotificationService = Depends(get_notification_service)
|
|
):
|
|
db_show = Show.model_validate(show)
|
|
session.add(db_show)
|
|
session.commit()
|
|
session.refresh(db_show)
|
|
|
|
# Trigger notifications
|
|
try:
|
|
notification_service.check_show_alert(db_show)
|
|
except Exception as e:
|
|
print(f"Error sending notifications: {e}")
|
|
|
|
return db_show
|
|
|
|
@router.get("/", response_model=List[ShowRead])
|
|
def read_shows(
|
|
offset: int = 0,
|
|
limit: int = Query(default=2000, le=5000),
|
|
venue_id: int = None,
|
|
tour_id: int = None,
|
|
year: int = None,
|
|
vertical_slugs: List[str] = Query(None),
|
|
status: str = Query(default=None, regex="^(past|upcoming)$"),
|
|
tiers: List[str] = Query(None),
|
|
current_user = Depends(get_current_user_optional),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
from sqlalchemy.orm import joinedload
|
|
query = select(Show).options(joinedload(Show.vertical))
|
|
|
|
if tiers and current_user:
|
|
prefs = session.exec(
|
|
select(UserVerticalPreference)
|
|
.where(UserVerticalPreference.user_id == current_user.id)
|
|
.where(UserVerticalPreference.tier.in_(tiers))
|
|
).all()
|
|
allowed_ids = [p.vertical_id for p in prefs]
|
|
# If user selected tiers but has no bands in them, return empty
|
|
if not allowed_ids:
|
|
return []
|
|
query = query.where(Show.vertical_id.in_(allowed_ids))
|
|
elif tiers and not current_user:
|
|
# Anonymous users can't filter by personal tiers
|
|
return []
|
|
if venue_id:
|
|
query = query.where(Show.venue_id == venue_id)
|
|
if tour_id:
|
|
query = query.where(Show.tour_id == tour_id)
|
|
if year:
|
|
from sqlalchemy import extract
|
|
query = query.where(extract('year', Show.date) == year)
|
|
|
|
if vertical_slugs:
|
|
query = query.join(Vertical).where(Vertical.slug.in_(vertical_slugs))
|
|
|
|
if status:
|
|
from datetime import datetime
|
|
if status == "past":
|
|
query = query.where(Show.date <= datetime.now())
|
|
elif status == "upcoming":
|
|
query = query.where(Show.date > datetime.now())
|
|
|
|
shows = session.exec(query.offset(offset).limit(limit)).all()
|
|
return shows
|
|
|
|
@router.get("/recent", response_model=List[ShowRead])
|
|
def read_recent_shows(
|
|
limit: int = Query(default=10, le=50),
|
|
tiers: List[str] = Query(None),
|
|
current_user = Depends(get_current_user_optional),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
"""Get the most recent shows ordered by date descending"""
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import joinedload
|
|
query = select(Show).options(joinedload(Show.vertical)).where(Show.date <= datetime.now())
|
|
|
|
if tiers and current_user:
|
|
prefs = session.exec(
|
|
select(UserVerticalPreference)
|
|
.where(UserVerticalPreference.user_id == current_user.id)
|
|
.where(UserVerticalPreference.tier.in_(tiers))
|
|
).all()
|
|
allowed_ids = [p.vertical_id for p in prefs]
|
|
if not allowed_ids:
|
|
return []
|
|
query = query.where(Show.vertical_id.in_(allowed_ids))
|
|
|
|
query = query.order_by(Show.date.desc()).limit(limit)
|
|
shows = session.exec(query).all()
|
|
return shows
|
|
|
|
@router.get("/upcoming", response_model=List[ShowRead])
|
|
def read_upcoming_shows(
|
|
limit: int = Query(default=50, le=100),
|
|
tiers: List[str] = Query(None),
|
|
current_user = Depends(get_current_user_optional),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
"""Get upcoming shows ordered by date ascending"""
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import joinedload
|
|
query = select(Show).options(joinedload(Show.vertical)).where(Show.date > datetime.now())
|
|
|
|
if tiers and current_user:
|
|
prefs = session.exec(
|
|
select(UserVerticalPreference)
|
|
.where(UserVerticalPreference.user_id == current_user.id)
|
|
.where(UserVerticalPreference.tier.in_(tiers))
|
|
).all()
|
|
allowed_ids = [p.vertical_id for p in prefs]
|
|
if not allowed_ids:
|
|
return []
|
|
query = query.where(Show.vertical_id.in_(allowed_ids))
|
|
|
|
query = query.order_by(Show.date.asc()).limit(limit)
|
|
shows = session.exec(query).all()
|
|
return shows
|
|
|
|
@router.get("/{slug}", response_model=ShowRead)
|
|
def read_show(slug: str, session: Session = Depends(get_session)):
|
|
show = session.exec(select(Show).where(Show.slug == slug)).first()
|
|
|
|
if not show:
|
|
raise HTTPException(status_code=404, detail="Show not found")
|
|
|
|
tags = session.exec(
|
|
select(Tag)
|
|
.join(EntityTag, Tag.id == EntityTag.tag_id)
|
|
.where(EntityTag.entity_type == "show")
|
|
.where(EntityTag.entity_id == show.id)
|
|
).all()
|
|
|
|
# Manually populate performances to ensure nicknames are filtered if needed
|
|
# (Though for now we just return all, or filter approved in schema if we had a custom getter)
|
|
# The relationship `show.performances` is already loaded if we access it, but we might want to sort.
|
|
|
|
# Re-fetch show with relationships if needed, or just rely on lazy loading + validation
|
|
# But for nicknames, we only want "approved" ones usually.
|
|
# Let's let the frontend filter or do it here.
|
|
# Doing it here is safer.
|
|
|
|
show_data = ShowRead.model_validate(show)
|
|
show_data.tags = tags
|
|
|
|
# Get vertical for band name
|
|
vertical = session.get(Vertical, show.vertical_id)
|
|
show_data.vertical = vertical
|
|
|
|
# Sort performances by position
|
|
sorted_perfs = sorted(show.performances, key=lambda p: p.position)
|
|
|
|
# Filter nicknames for each performance
|
|
for perf in sorted_perfs:
|
|
perf.nicknames = [n for n in perf.nicknames if n.status == "approved"]
|
|
|
|
show_data.performances = sorted_perfs
|
|
|
|
return show_data
|
|
|
|
@router.patch("/{show_id}", response_model=ShowRead)
|
|
def update_show(show_id: int, show: ShowUpdate, session: Session = Depends(get_session), current_user = Depends(get_current_user)):
|
|
db_show = session.get(Show, show_id)
|
|
if not db_show:
|
|
raise HTTPException(status_code=404, detail="Show not found")
|
|
show_data = show.model_dump(exclude_unset=True)
|
|
db_show.sqlmodel_update(show_data)
|
|
session.add(db_show)
|
|
session.commit()
|
|
session.refresh(db_show)
|
|
return db_show
|