""" Comprehensive El Goose Data Importer Fetches ALL Goose data from El Goose API and populates demo database """ import requests import time from datetime import datetime from sqlmodel import Session, select from database import engine from models import ( Vertical, Venue, Tour, Show, Song, Performance, Artist, User, UserPreferences ) from passlib.context import CryptContext from slugify import generate_slug, generate_show_slug BASE_URL = "http://elmeg-legacy-api:8000/api/v2" ARTIST_ID = 1 # Goose pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") # User personas for demo DEMO_USERS = [ {"email": "archivist@demo.com", "username": "TheArchivist", "role": "user", "wiki_mode": True}, {"email": "statnerd@demo.com", "username": "StatNerd420", "role": "user", "wiki_mode": False}, {"email": "reviewer@demo.com", "username": "CriticalListener", "role": "user", "wiki_mode": False}, {"email": "casual@demo.com", "username": "CasualFan", "role": "user", "wiki_mode": False}, {"email": "groupleader@demo.com", "username": "NortheastHonkers", "role": "user", "wiki_mode": False}, {"email": "mod@demo.com", "username": "ModGoose", "role": "moderator", "wiki_mode": False}, {"email": "admin@demo.com", "username": "AdminBird", "role": "admin", "wiki_mode": False}, {"email": "newbie@demo.com", "username": "NewToGoose", "role": "user", "wiki_mode": False}, {"email": "taper@demo.com", "username": "TaperTom", "role": "user", "wiki_mode": False}, {"email": "tourfollower@demo.com", "username": "RoadWarrior", "role": "user", "wiki_mode": False}, {"email": "lurker@demo.com", "username": "SilentHonker", "role": "user", "wiki_mode": True}, {"email": "hype@demo.com", "username": "HypeGoose", "role": "user", "wiki_mode": False}, ] def fetch_json(endpoint, params=None): """Fetch JSON from El Goose API (Single Page)""" url = f"{BASE_URL}/{endpoint}.json" try: response = requests.get(url, params=params) response.raise_for_status() data = response.json() if data.get('error') == 1: print(f"āŒ API Error: {data.get('error_message')}") return None return data.get('data', []) except Exception as e: print(f"āŒ Failed to fetch {endpoint}: {e}") return None def fetch_all_json(endpoint, params=None): """Fetch ALL data from El Goose API using pagination""" all_data = [] page = 1 params = params.copy() if params else {} print(f" Fetching {endpoint} pages...", end="", flush=True) while True: params['page'] = page data = fetch_json(endpoint, params) if not data: break all_data.extend(data) print(f" {page}", end="", flush=True) # If less than 50 results (typical page size might be larger, but if small, likely last page) # Actually API returns empty list [] if page out of range usually. # So 'if not data' handles it. # Safety break if too many pages if page > 100: print(" (Safety limit reached)", end="") break page += 1 print(" Done.") return all_data def create_users(session): """Create demo user personas""" print("\nšŸ“ Creating user personas...") users = [] for user_data in DEMO_USERS: # Check existing existing = session.exec( select(User).where(User.email == user_data["email"]) ).first() if existing: users.append(existing) continue user = User( email=user_data["email"], hashed_password=pwd_context.hash("demo123"), is_active=True, is_superuser=(user_data["role"] == "admin"), role=user_data["role"] ) session.add(user) session.commit() session.refresh(user) prefs = UserPreferences( user_id=user.id, wiki_mode=user_data["wiki_mode"], show_ratings=not user_data["wiki_mode"], show_comments=not user_data["wiki_mode"] ) session.add(prefs) users.append(user) session.commit() print(f"āœ“ Created/Found {len(users)} users") return users def import_venues(session): """Import all venues""" print("\nšŸ›ļø Importing venues...") venues_data = fetch_all_json("venues") if not venues_data: return {} venue_map = {} for v in venues_data: existing = session.exec( select(Venue).where(Venue.name == v['venuename']) ).first() if existing: venue_map[v['venue_id']] = existing.id else: venue = Venue( name=v['venuename'], slug=generate_slug(v['venuename']), city=v.get('city'), state=v.get('state'), country=v.get('country'), capacity=v.get('capacity') ) session.add(venue) session.commit() session.refresh(venue) venue_map[v['venue_id']] = venue.id print(f"āœ“ Imported {len(venue_map)} venues") return venue_map def import_songs(session, vertical_id): """Import all songs""" print("\nšŸŽµ Importing songs...") songs_data = fetch_all_json("songs") if not songs_data: return {} song_map = {} for s in songs_data: # Check if song exists existing = session.exec( select(Song).where( Song.title == s['name'], Song.vertical_id == vertical_id ) ).first() if existing: song_map[s['id']] = existing.id # API uses 'id' not 'song_id' else: song = Song( title=s['name'], slug=generate_slug(s['name']), original_artist=s.get('original_artist'), vertical_id=vertical_id # API doesn't include debut_date or times_played in base response ) session.add(song) session.commit() session.refresh(song) song_map[s['id']] = song.id # API uses 'id' not 'song_id' print(f"āœ“ Imported {len(song_map)} songs") return song_map def import_shows(session, vertical_id, venue_map): """Import all Goose shows""" print("\nšŸŽ¤ Importing shows...") params = {"artist": ARTIST_ID} shows_data = fetch_all_json("shows", params) if not shows_data: # Fallback: fetch all shows and filter # Fallback: fetch all shows and filter print(" Fetching all shows and filtering for Goose...") shows_data = fetch_all_json("shows") shows_data = [s for s in (shows_data or []) if s.get('artist_id') == ARTIST_ID] if not shows_data: print("āŒ No shows found") return {}, {} show_map = {} tour_map = {} for s in shows_data: # Handle tours tour_id = None if s.get('tour_id') and s['tour_id'] != 1: # 1 = "Not Part of a Tour" if s['tour_id'] not in tour_map: # Check if tour exists existing_tour = session.exec( select(Tour).where(Tour.name == s['tourname']) ).first() if existing_tour: tour_map[s['tour_id']] = existing_tour.id else: tour = Tour( name=s['tourname'], slug=generate_slug(s['tourname']) ) session.add(tour) session.commit() session.refresh(tour) tour_map[s['tour_id']] = tour.id tour_id = tour_map[s['tour_id']] # Create show show_date = datetime.strptime(s['showdate'], '%Y-%m-%d') # Check existing show existing_show = session.exec( select(Show).where( Show.date == show_date, Show.venue_id == venue_map.get(s['venue_id']) ) ).first() if existing_show: show_map[s['show_id']] = existing_show.id else: show = Show( date=show_date, slug=generate_show_slug(s['showdate'], s.get('venuename', 'unknown')), vertical_id=vertical_id, venue_id=venue_map.get(s['venue_id']), tour_id=tour_id, notes=s.get('showtitle') ) session.add(show) session.commit() session.refresh(show) show_map[s['show_id']] = show.id if len(show_map) % 50 == 0: print(f" Progress: {len(show_map)} shows...") print(f"āœ“ Imported {len(show_map)} shows and {len(tour_map)} tours") return show_map, tour_map def import_setlists(session, show_map, song_map): """Import setlists for all shows (Paginated)""" print("\nšŸ“‹ Importing setlists...") page = 1 total_processed = 0 performance_count = 0 new_performance_ids = [] # Track new performances for chase notifications params = {} while True: params['page'] = page print(f" Fetching setlists page {page}...", end="", flush=True) data = fetch_json("setlists", params) if not data: print(" Done (No Data/End).") break print(f" Processing {len(data)} items...", end="", flush=True) count_in_page = 0 for perf_data in data: # Map to our show and song IDs our_show_id = show_map.get(perf_data.get('show_id')) our_song_id = song_map.get(perf_data.get('song_id')) if not our_show_id or not our_song_id: continue # Check existing performance validation is expensive. # We trust idempotency or assume clean run? # User idempotency request requires checking. existing_perf = session.exec( select(Performance).where( Performance.show_id == our_show_id, Performance.song_id == our_song_id, Performance.position == perf_data.get('position', 0) ) ).first() if not existing_perf: # Map setnumber to set_name set_val = str(perf_data.get('setnumber', '1')) if set_val.isdigit(): set_name = f"Set {set_val}" elif set_val.lower() == 'e': set_name = "Encore" elif set_val.lower() == 'e2': set_name = "Encore 2" elif set_val.lower() == 's': set_name = "Soundcheck" else: set_name = f"Set {set_val}" perf = Performance( show_id=our_show_id, song_id=our_song_id, position=perf_data.get('position', 0), set_name=set_name, segue=bool(perf_data.get('segue', 0)), notes=perf_data.get('notes') ) session.add(perf) session.flush() # Get the ID new_performance_ids.append(perf.id) count_in_page += 1 performance_count += 1 session.commit() # Send chase notifications for new performances if new_performance_ids: try: from services.chase_notifications import check_new_performances_for_chases check_new_performances_for_chases(session, new_performance_ids) except Exception as e: print(f" Warning: Chase notifications failed: {e}") print(f" Validated/Added {count_in_page} items.") page += 1 new_performance_ids = [] # Reset for next page print(f"āœ“ Imported {performance_count} new performances") def main(): print("="*60) print("EL GOOSE DATA IMPORTER") print("="*60) with Session(engine) as session: # 1. Get or create vertical print("\nšŸ¦† Creating Goose vertical...") vertical = session.exec( select(Vertical).where(Vertical.slug == "goose") ).first() if not vertical: vertical = Vertical( name="Goose", slug="goose", description="Goose is a jam band from Connecticut" ) session.add(vertical) session.commit() session.refresh(vertical) print(f"āœ“ Created vertical (ID: {vertical.id})") else: print(f"āœ“ Using existing vertical (ID: {vertical.id})") # 2. Create users users = create_users(session) # 3. Import base data venue_map = import_venues(session) song_map = import_songs(session, vertical.id) # 4. Import shows show_map, tour_map = import_shows(session, vertical.id, venue_map) # 5. Import setlists import_setlists(session, show_map, song_map) print("\n" + "="*60) print("āœ“ IMPORT COMPLETE!") print("="*60) print(f"\nImported:") print(f" • {len(venue_map)} venues") print(f" • {len(tour_map)} tours") print(f" • {len(song_map)} songs") print(f" • {len(show_map)} shows") print(f" • {len(users)} demo users") print(f"\nAll passwords: demo123") print(f"\nStart demo servers:") print(f" Backend: DATABASE_URL='sqlite:///./elmeg-demo.db' uvicorn main:app --reload --port 8001") print(f" Frontend: NEXT_PUBLIC_API_URL=http://localhost:8001 npm run dev -- -p 3001") if __name__ == "__main__": main()