fediversion/backend/routers/performances.py
fullsizemalt b4cddf41ea feat: Initialize Fediversion multi-band platform
- Fork elmeg-demo codebase for multi-band support
- Add data importer infrastructure with base class
- Create band-specific importers:
  - phish.py: Phish.net API v5
  - grateful_dead.py: Grateful Stats API
  - setlistfm.py: Dead & Company, Billy Strings (Setlist.fm)
- Add spec-kit configuration for Gemini
- Update README with supported bands and architecture
2025-12-28 12:39:28 -08:00

198 lines
7.4 KiB
Python

from typing import List
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import Session, select, func
from database import get_session
from models import Performance, PerformanceNickname, Tag, EntityTag, Show, Venue, Rating, Review
from schemas import PerformanceDetailRead, PerformanceNicknameCreate, PerformanceNicknameRead, PerformanceReadWithShow
from auth import get_current_user
router = APIRouter(prefix="/performances", tags=["performances"])
@router.get("/{slug}", response_model=PerformanceDetailRead)
def read_performance(slug: str, session: Session = Depends(get_session)):
performance = session.exec(
select(Performance).where(Performance.slug == slug)
).first()
if not performance:
raise HTTPException(status_code=404, detail="Performance not found")
performance_id = performance.id # Use actual ID for lookups
# --- Calculate Stats & Navigation ---
# Get all performances of this song, ordered by date
# Join Show and Venue for list display
all_perfs_data = session.exec(
select(Performance, Show, Venue)
.join(Show, Performance.show_id == Show.id)
.outerjoin(Venue, Show.venue_id == Venue.id)
.where(Performance.song_id == performance.song_id)
.order_by(Show.date)
).all()
# Find current index
current_index = -1
for i, (p, s, v) in enumerate(all_perfs_data):
if p.id == performance_id:
current_index = i
break
prev_slug = None
next_slug = None
prev_id = None
next_id = None
gap = 0
times_played = current_index + 1 # 1-based count
if current_index > 0:
prev_perf = all_perfs_data[current_index - 1][0]
prev_id = prev_perf.id
prev_slug = prev_perf.slug
# Calculate Gap
prev_date = all_perfs_data[current_index - 1][1].date
current_date = all_perfs_data[current_index][1].date
gap = session.exec(
select(func.count(Show.id))
.where(Show.date > prev_date)
.where(Show.date < current_date)
).one()
if current_index < len(all_perfs_data) - 1:
next_perf = all_perfs_data[current_index + 1][0]
next_id = next_perf.id
next_slug = next_perf.slug
# Fetch ratings AND review scores for all performances of this song
# Combine both Rating and Review tables for comprehensive scores
rating_stats = session.exec(
select(Rating.performance_id, func.avg(Rating.score), func.count(Rating.id))
.where(Rating.performance_id.is_not(None))
.group_by(Rating.performance_id)
).all()
review_stats = session.exec(
select(Review.performance_id, func.avg(Review.score), func.count(Review.id))
.where(Review.performance_id.is_not(None))
.where(Review.score.is_not(None))
.group_by(Review.performance_id)
).all()
# Merge rating and review stats
rating_map = {}
for row in rating_stats:
perf_id = row[0]
rating_map[perf_id] = {"avg": row[1] or 0.0, "count": row[2] or 0}
for row in review_stats:
perf_id = row[0]
if perf_id in rating_map:
# Combine averages weighted by count
existing = rating_map[perf_id]
total_count = existing["count"] + (row[2] or 0)
if total_count > 0:
combined_avg = ((existing["avg"] * existing["count"]) + ((row[1] or 0) * (row[2] or 0))) / total_count
rating_map[perf_id] = {"avg": combined_avg, "count": total_count}
else:
rating_map[perf_id] = {"avg": row[1] or 0.0, "count": row[2] or 0}
# Build other_performances list
other_performances = []
for p, s, v in all_perfs_data:
if p.id == performance_id:
continue
stats = rating_map.get(p.id, {"avg": 0.0, "count": 0})
perf_read = PerformanceReadWithShow(
**p.model_dump(),
song=performance.song, # Reuse loaded song object
show_date=s.date,
show_slug=s.slug,
venue_name=v.name if v else "Unknown Venue",
venue_city=v.city if v else "Unknown City",
venue_state=v.state if v else None,
avg_rating=stats["avg"],
total_reviews=stats["count"],
nicknames=p.nicknames
)
other_performances.append(perf_read)
# Sort by rating desc, then date desc
other_performances.sort(key=lambda x: (x.avg_rating or 0, x.show_date), reverse=True)
# --- Calculate Ranking for Current Performance ---
current_perf_stats = rating_map.get(performance_id, {"avg": 0.0, "count": 0})
current_avg = current_perf_stats["avg"] or 0.0
current_rating_count = current_perf_stats["count"]
# Build list of all performances with their ratings for ranking
all_rated_perfs = []
for p, s, v in all_perfs_data:
stats = rating_map.get(p.id, {"avg": 0.0, "count": 0})
all_rated_perfs.append({"id": p.id, "avg": stats["avg"] or 0.0, "count": stats["count"]})
# Sort by avg rating desc
all_rated_perfs.sort(key=lambda x: x["avg"], reverse=True)
# Find rank (1-indexed)
rank = None
for i, item in enumerate(all_rated_perfs):
if item["id"] == performance_id:
rank = i + 1
break
total_versions = len(all_perfs_data)
# Heady = #1 ranked with avg >= 8.0 OR top 20% with at least 1 rating and avg >= 8.0
is_heady = False
if current_rating_count > 0 and current_avg >= 8.0:
if rank == 1: # Top ranked version always heady if avg >= 8.0
is_heady = True
elif rank and total_versions > 0:
percentile = rank / total_versions
is_heady = percentile <= 0.2 # Top 20%
# Construct response manually to include extra fields
# We need to ensure nested models (show, song) are validated correctly
perf_dict = performance.model_dump()
perf_dict['show'] = performance.show
perf_dict['song'] = performance.song
perf_dict['nicknames'] = performance.nicknames
perf_dict['previous_performance_id'] = prev_id
perf_dict['previous_performance_slug'] = prev_slug
perf_dict['next_performance_id'] = next_id
perf_dict['next_performance_slug'] = next_slug
perf_dict['gap'] = gap
perf_dict['times_played'] = times_played
perf_dict['other_performances'] = other_performances
perf_dict['rank'] = rank
perf_dict['total_versions'] = total_versions
perf_dict['avg_rating'] = current_avg
perf_dict['rating_count'] = current_rating_count
perf_dict['is_heady'] = is_heady
return perf_dict
@router.post("/{performance_id}/nicknames", response_model=PerformanceNicknameRead)
def suggest_nickname(
performance_id: int,
nickname: PerformanceNicknameCreate,
session: Session = Depends(get_session),
current_user = Depends(get_current_user)
):
# Check if performance exists
perf = session.get(Performance, performance_id)
if not perf:
raise HTTPException(status_code=404, detail="Performance not found")
db_nickname = PerformanceNickname.model_validate(nickname)
db_nickname.performance_id = performance_id
db_nickname.suggested_by = current_user.id
db_nickname.status = "pending" # Default to pending
session.add(db_nickname)
session.commit()
session.refresh(db_nickname)
return db_nickname