""" Fediversion Data Importer Base Class Abstract base class providing common functionality for all band-specific importers: - Rate limiting - Caching - Idempotent upsert logic - Progress tracking """ import time import requests import hashlib import json from abc import ABC, abstractmethod from datetime import datetime from pathlib import Path from typing import Optional, Dict, List, Any from sqlmodel import Session, select from database import engine from models import Vertical, Venue, Tour, Show, Song, Performance from slugify import generate_slug, generate_show_slug class ImporterBase(ABC): """Base class for all band data importers""" # Override in subclasses VERTICAL_NAME: str = "" VERTICAL_SLUG: str = "" VERTICAL_DESCRIPTION: str = "" # Rate limiting REQUEST_DELAY: float = 0.5 # seconds between requests # Cache settings CACHE_DIR: Path = Path(__file__).parent / ".cache" CACHE_TTL: int = 3600 # 1 hour def __init__(self, session: Session): self.session = session self.vertical: Optional[Vertical] = None self.venue_map: Dict[str, int] = {} # external_id -> our_id self.song_map: Dict[str, int] = {} self.show_map: Dict[str, int] = {} self.tour_map: Dict[str, int] = {} self.last_request_time: float = 0 # Ensure cache directory exists self.CACHE_DIR.mkdir(exist_ok=True) def _rate_limit(self): """Ensure we don't exceed rate limits""" elapsed = time.time() - self.last_request_time if elapsed < self.REQUEST_DELAY: time.sleep(self.REQUEST_DELAY - elapsed) self.last_request_time = time.time() def _cache_key(self, url: str, params: Optional[Dict] = None) -> str: """Generate cache key for request""" key_data = url + (json.dumps(params, sort_keys=True) if params else "") return hashlib.md5(key_data.encode()).hexdigest() def _get_cache(self, cache_key: str) -> Optional[Dict]: """Get cached response if valid""" cache_file = self.CACHE_DIR / f"{cache_key}.json" if cache_file.exists(): try: data = json.loads(cache_file.read_text()) if time.time() - data.get("cached_at", 0) < self.CACHE_TTL: return data.get("response") except (json.JSONDecodeError, KeyError): pass return None def _set_cache(self, cache_key: str, response: Any): """Store response in cache""" cache_file = self.CACHE_DIR / f"{cache_key}.json" cache_file.write_text(json.dumps({ "cached_at": time.time(), "response": response })) def fetch_json(self, url: str, params: Optional[Dict] = None, headers: Optional[Dict] = None, use_cache: bool = True) -> Optional[Dict]: """Fetch JSON with rate limiting and caching""" cache_key = self._cache_key(url, params) if use_cache: cached = self._get_cache(cache_key) if cached is not None: return cached self._rate_limit() try: response = requests.get(url, params=params, headers=headers, timeout=30) response.raise_for_status() data = response.json() if use_cache: self._set_cache(cache_key, data) return data except requests.RequestException as e: print(f"❌ Request failed: {url} - {e}") return None def get_or_create_vertical(self) -> Vertical: """Get or create the vertical for this importer""" if self.vertical: return self.vertical self.vertical = self.session.exec( select(Vertical).where(Vertical.slug == self.VERTICAL_SLUG) ).first() if not self.vertical: self.vertical = Vertical( name=self.VERTICAL_NAME, slug=self.VERTICAL_SLUG, description=self.VERTICAL_DESCRIPTION ) self.session.add(self.vertical) self.session.commit() self.session.refresh(self.vertical) print(f"✓ Created vertical: {self.VERTICAL_NAME}") else: print(f"✓ Using existing vertical: {self.VERTICAL_NAME}") return self.vertical def upsert_venue(self, name: str, city: str, state: Optional[str], country: str, external_id: Optional[str] = None, capacity: Optional[int] = None) -> int: """Insert or update a venue, return internal ID""" # Check if we've already processed this if external_id and external_id in self.venue_map: return self.venue_map[external_id] # Check database existing = self.session.exec( select(Venue).where(Venue.name == name, Venue.city == city) ).first() if existing: venue_id = existing.id else: venue = Venue( name=name, slug=generate_slug(name), city=city, state=state, country=country, capacity=capacity ) self.session.add(venue) self.session.commit() self.session.refresh(venue) venue_id = venue.id if external_id: self.venue_map[external_id] = venue_id return venue_id def upsert_song(self, title: str, original_artist: Optional[str] = None, external_id: Optional[str] = None) -> int: """Insert or update a song, return internal ID""" if external_id and external_id in self.song_map: return self.song_map[external_id] vertical = self.get_or_create_vertical() existing = self.session.exec( select(Song).where(Song.title == title, Song.vertical_id == vertical.id) ).first() if existing: song_id = existing.id else: song = Song( title=title, slug=generate_slug(title), original_artist=original_artist, vertical_id=vertical.id ) self.session.add(song) self.session.commit() self.session.refresh(song) song_id = song.id if external_id: self.song_map[external_id] = song_id return song_id def upsert_tour(self, name: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, external_id: Optional[str] = None) -> int: """Insert or update a tour, return internal ID""" if external_id and external_id in self.tour_map: return self.tour_map[external_id] existing = self.session.exec( select(Tour).where(Tour.name == name) ).first() if existing: tour_id = existing.id else: tour = Tour( name=name, slug=generate_slug(name), start_date=start_date, end_date=end_date ) self.session.add(tour) self.session.commit() self.session.refresh(tour) tour_id = tour.id if external_id: self.tour_map[external_id] = tour_id return tour_id def upsert_show(self, date: datetime, venue_id: int, tour_id: Optional[int] = None, notes: Optional[str] = None, external_id: Optional[str] = None) -> int: """Insert or update a show, return internal ID""" if external_id and external_id in self.show_map: return self.show_map[external_id] vertical = self.get_or_create_vertical() # Check by date + venue existing = self.session.exec( select(Show).where( Show.date == date, Show.venue_id == venue_id, Show.vertical_id == vertical.id ) ).first() if existing: show_id = existing.id else: # Get venue name for slug venue = self.session.get(Venue, venue_id) venue_name = venue.name if venue else "unknown" show = Show( date=date, slug=generate_show_slug(date.strftime("%Y-%m-%d"), venue_name), vertical_id=vertical.id, venue_id=venue_id, tour_id=tour_id, notes=notes ) self.session.add(show) self.session.commit() self.session.refresh(show) show_id = show.id if external_id: self.show_map[external_id] = show_id return show_id def upsert_performance(self, show_id: int, song_id: int, position: int, set_name: Optional[str] = None, segue: bool = False, notes: Optional[str] = None) -> int: """Insert or update a performance, return internal ID""" existing = self.session.exec( select(Performance).where( Performance.show_id == show_id, Performance.song_id == song_id, Performance.position == position ) ).first() if existing: return existing.id perf = Performance( show_id=show_id, song_id=song_id, position=position, set_name=set_name, segue=segue, notes=notes ) self.session.add(perf) self.session.commit() self.session.refresh(perf) return perf.id @abstractmethod def import_all(self): """Main entry point - implement in subclasses""" pass @abstractmethod def import_songs(self) -> Dict[str, int]: """Import all songs, return mapping of external_id -> internal_id""" pass @abstractmethod def import_venues(self) -> Dict[str, int]: """Import all venues, return mapping of external_id -> internal_id""" pass @abstractmethod def import_shows(self) -> Dict[str, int]: """Import all shows, return mapping of external_id -> internal_id""" pass @abstractmethod def import_setlists(self): """Import setlists/performances for all shows""" pass