fediversion/backend/import_elgoose.py

423 lines
14 KiB
Python

"""
Comprehensive El Goose Data Importer
Fetches ALL Goose data from El Goose API and populates demo database
"""
import requests
import time
from datetime import datetime
from sqlmodel import Session, select, or_
from database import engine
from models import (
Vertical, Venue, Tour, Show, Song, Performance, Artist,
User, UserPreferences
)
from passlib.context import CryptContext
from slugify import generate_slug, generate_show_slug
BASE_URL = "https://elgoose.net/api/v2"
ARTIST_ID = 1 # Goose
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
# User personas for demo
DEMO_USERS = [
{"email": "archivist@demo.com", "username": "TheArchivist", "role": "user", "wiki_mode": True},
{"email": "statnerd@demo.com", "username": "StatNerd420", "role": "user", "wiki_mode": False},
{"email": "reviewer@demo.com", "username": "CriticalListener", "role": "user", "wiki_mode": False},
{"email": "casual@demo.com", "username": "CasualFan", "role": "user", "wiki_mode": False},
{"email": "groupleader@demo.com", "username": "NortheastHonkers", "role": "user", "wiki_mode": False},
{"email": "mod@demo.com", "username": "ModGoose", "role": "moderator", "wiki_mode": False},
{"email": "admin@demo.com", "username": "AdminBird", "role": "admin", "wiki_mode": False},
{"email": "newbie@demo.com", "username": "NewToGoose", "role": "user", "wiki_mode": False},
{"email": "taper@demo.com", "username": "TaperTom", "role": "user", "wiki_mode": False},
{"email": "tourfollower@demo.com", "username": "RoadWarrior", "role": "user", "wiki_mode": False},
{"email": "lurker@demo.com", "username": "SilentHonker", "role": "user", "wiki_mode": True},
{"email": "hype@demo.com", "username": "HypeGoose", "role": "user", "wiki_mode": False},
]
def fetch_json(endpoint, params=None):
"""Fetch JSON from El Goose API (Single Page)"""
url = f"{BASE_URL}/{endpoint}.json"
try:
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
if data.get('error') == 1:
print(f"❌ API Error: {data.get('error_message')}")
return None
return data.get('data', [])
except Exception as e:
print(f"❌ Failed to fetch {endpoint}: {e}")
return None
def fetch_all_json(endpoint, params=None):
"""Fetch ALL data from El Goose API using pagination"""
all_data = []
page = 1
params = params.copy() if params else {}
print(f" Fetching {endpoint} pages...", end="", flush=True)
while True:
params['page'] = page
data = fetch_json(endpoint, params)
if not data:
break
all_data.extend(data)
print(f" {page}", end="", flush=True)
# If less than 50 results (typical page size might be larger, but if small, likely last page)
# Actually API returns empty list [] if page out of range usually.
# So 'if not data' handles it.
# Safety break if too many pages
if page > 100:
print(" (Safety limit reached)", end="")
break
page += 1
print(" Done.")
return all_data
def create_users(session):
"""Create demo user personas"""
print("\n📝 Creating user personas...")
users = []
for user_data in DEMO_USERS:
# Check existing
existing = session.exec(
select(User).where(User.email == user_data["email"])
).first()
if existing:
users.append(existing)
continue
user = User(
email=user_data["email"],
hashed_password=pwd_context.hash("demo123"),
is_active=True,
is_superuser=(user_data["role"] == "admin"),
role=user_data["role"]
)
session.add(user)
session.commit()
session.refresh(user)
prefs = UserPreferences(
user_id=user.id,
wiki_mode=user_data["wiki_mode"],
show_ratings=not user_data["wiki_mode"],
show_comments=not user_data["wiki_mode"]
)
session.add(prefs)
users.append(user)
session.commit()
print(f"✓ Created/Found {len(users)} users")
return users
from sqlmodel import Session, select, or_
# ... (imports)
def import_venues(session):
"""Import all venues"""
print("\n🏛️ Importing venues...")
venues_data = fetch_all_json("venues")
if not venues_data:
return {}
venue_map = {}
for v in venues_data:
slug = generate_slug(v['venuename'])
existing = session.exec(
select(Venue).where(
or_(
Venue.name == v['venuename'],
Venue.slug == slug
)
)
).first()
if existing:
venue_map[v['venue_id']] = existing.id
else:
venue = Venue(
name=v['venuename'],
slug=slug,
city=v.get('city'),
state=v.get('state'),
country=v.get('country'),
capacity=v.get('capacity')
)
session.add(venue)
session.commit()
session.refresh(venue)
venue_map[v['venue_id']] = venue.id
print(f"✓ Imported {len(venue_map)} venues")
return venue_map
def import_songs(session, vertical_id):
"""Import all songs"""
print("\n🎵 Importing songs...")
songs_data = fetch_all_json("songs")
if not songs_data:
return {}
song_map = {}
for s in songs_data:
# Check if song exists
existing = session.exec(
select(Song).where(
Song.title == s['name'],
Song.vertical_id == vertical_id
)
).first()
if existing:
song_map[s['id']] = existing.id # API uses 'id' not 'song_id'
else:
song = Song(
title=s['name'],
slug=generate_slug(s['name']),
original_artist=s.get('original_artist'),
vertical_id=vertical_id
# API doesn't include debut_date or times_played in base response
)
session.add(song)
session.commit()
session.refresh(song)
song_map[s['id']] = song.id # API uses 'id' not 'song_id'
print(f"✓ Imported {len(song_map)} songs")
return song_map
def import_shows(session, vertical_id, venue_map):
"""Import all Goose shows"""
print("\n🎤 Importing shows...")
params = {"artist": ARTIST_ID}
shows_data = fetch_all_json("shows", params)
if not shows_data:
# Fallback: fetch all shows and filter
# Fallback: fetch all shows and filter
print(" Fetching all shows and filtering for Goose...")
shows_data = fetch_all_json("shows")
shows_data = [s for s in (shows_data or []) if s.get('artist_id') == ARTIST_ID]
if not shows_data:
print("❌ No shows found")
return {}, {}
show_map = {}
tour_map = {}
for s in shows_data:
# Handle tours
tour_id = None
if s.get('tour_id') and s['tour_id'] != 1: # 1 = "Not Part of a Tour"
if s['tour_id'] not in tour_map:
# Check if tour exists
existing_tour = session.exec(
select(Tour).where(Tour.name == s['tourname'])
).first()
if existing_tour:
tour_map[s['tour_id']] = existing_tour.id
else:
tour = Tour(
name=s['tourname'],
slug=generate_slug(s['tourname'])
)
session.add(tour)
session.commit()
session.refresh(tour)
tour_map[s['tour_id']] = tour.id
tour_id = tour_map[s['tour_id']]
# Create show
show_date = datetime.strptime(s['showdate'], '%Y-%m-%d')
# Check existing show
existing_show = session.exec(
select(Show).where(
Show.date == show_date,
Show.venue_id == venue_map.get(s['venue_id'])
)
).first()
if existing_show:
show_map[s['show_id']] = existing_show.id
else:
show = Show(
date=show_date,
slug=generate_show_slug(s['showdate'], s.get('venuename', 'unknown')),
vertical_id=vertical_id,
venue_id=venue_map.get(s['venue_id']),
tour_id=tour_id,
notes=s.get('showtitle')
)
session.add(show)
session.commit()
session.refresh(show)
show_map[s['show_id']] = show.id
if len(show_map) % 50 == 0:
print(f" Progress: {len(show_map)} shows...")
print(f"✓ Imported {len(show_map)} shows and {len(tour_map)} tours")
return show_map, tour_map
def import_setlists(session, show_map, song_map):
"""Import setlists for all shows (Paginated)"""
print("\n📋 Importing setlists...")
page = 1
total_processed = 0
performance_count = 0
new_performance_ids = [] # Track new performances for chase notifications
params = {}
while True:
params['page'] = page
print(f" Fetching setlists page {page}...", end="", flush=True)
data = fetch_json("setlists", params)
if not data:
print(" Done (No Data/End).")
break
print(f" Processing {len(data)} items...", end="", flush=True)
count_in_page = 0
for perf_data in data:
# Map to our show and song IDs
our_show_id = show_map.get(perf_data.get('show_id'))
our_song_id = song_map.get(perf_data.get('song_id'))
if not our_show_id or not our_song_id:
continue
# Check existing performance validation is expensive.
# We trust idempotency or assume clean run?
# User idempotency request requires checking.
existing_perf = session.exec(
select(Performance).where(
Performance.show_id == our_show_id,
Performance.song_id == our_song_id,
Performance.position == perf_data.get('position', 0)
)
).first()
if not existing_perf:
# Map setnumber to set_name
set_val = str(perf_data.get('setnumber', '1'))
if set_val.isdigit():
set_name = f"Set {set_val}"
elif set_val.lower() == 'e':
set_name = "Encore"
elif set_val.lower() == 'e2':
set_name = "Encore 2"
elif set_val.lower() == 's':
set_name = "Soundcheck"
else:
set_name = f"Set {set_val}"
perf = Performance(
show_id=our_show_id,
song_id=our_song_id,
position=perf_data.get('position', 0),
set_name=set_name,
segue=bool(perf_data.get('segue', 0)),
notes=perf_data.get('notes')
)
session.add(perf)
session.flush() # Get the ID
new_performance_ids.append(perf.id)
count_in_page += 1
performance_count += 1
session.commit()
# Send chase notifications for new performances
if new_performance_ids:
try:
from services.chase_notifications import check_new_performances_for_chases
check_new_performances_for_chases(session, new_performance_ids)
except Exception as e:
print(f" Warning: Chase notifications failed: {e}")
print(f" Validated/Added {count_in_page} items.")
page += 1
new_performance_ids = [] # Reset for next page
print(f"✓ Imported {performance_count} new performances")
def run_import(session: Session, with_users: bool = False):
"""Run the import process programmatically"""
# 1. Get or create vertical
print("\n🦆 Creating Goose vertical...")
vertical = session.exec(
select(Vertical).where(Vertical.slug == "goose")
).first()
if not vertical:
vertical = Vertical(
name="Goose",
slug="goose",
description="Goose is a jam band from Connecticut"
)
session.add(vertical)
session.commit()
session.refresh(vertical)
print(f"✓ Created vertical (ID: {vertical.id})")
else:
print(f"✓ Using existing vertical (ID: {vertical.id})")
users = []
if with_users:
# 2. Create users
users = create_users(session)
# 3. Import base data
venue_map = import_venues(session)
song_map = import_songs(session, vertical.id)
# 4. Import shows
show_map, tour_map = import_shows(session, vertical.id, venue_map)
# 5. Import setlists
import_setlists(session, show_map, song_map)
return {
"venues": len(venue_map),
"tours": len(tour_map),
"songs": len(song_map),
"shows": len(show_map),
"users": len(users)
}
def main():
print("="*60)
print("EL GOOSE DATA IMPORTER")
print("="*60)
with Session(engine) as session:
stats = run_import(session, with_users=True)
print("\n" + "="*60)
print("✓ IMPORT COMPLETE!")
print("="*60)
print(f"\nImported:")
print(f"{stats['venues']} venues")
print(f"{stats['tours']} tours")
print(f"{stats['songs']} songs")
print(f"{stats['shows']} shows")
print(f"{stats['users']} demo users")
print(f"\nAll passwords: demo123")
print(f"\nStart demo servers:")
print(f" Backend: DATABASE_URL='sqlite:///./elmeg-demo.db' uvicorn main:app --reload --port 8001")
print(f" Frontend: NEXT_PUBLIC_API_URL=http://localhost:8001 npm run dev -- -p 3001")
if __name__ == "__main__":
main()