fediversion/backend/importers/base.py
fullsizemalt d11878fdcd
Some checks failed
Deploy Fediversion / deploy (push) Failing after 1s
fix: DynamicImporter vertical bug + increase rate limit to 2s
2025-12-28 17:33:14 -08:00

321 lines
11 KiB
Python

"""
Fediversion Data Importer Base Class
Abstract base class providing common functionality for all band-specific importers:
- Rate limiting
- Caching
- Idempotent upsert logic
- Progress tracking
"""
import time
import requests
import hashlib
import json
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, List, Any
from sqlmodel import Session, select
from database import engine
from models import Vertical, Venue, Tour, Show, Song, Performance
from slugify import generate_slug, generate_show_slug
class ImporterBase(ABC):
"""Base class for all band data importers"""
# Override in subclasses
VERTICAL_NAME: str = ""
VERTICAL_SLUG: str = ""
VERTICAL_DESCRIPTION: str = ""
# Rate limiting
REQUEST_DELAY: float = 2.0 # seconds between requests (setlist.fm is strict)
# Cache settings
CACHE_DIR: Path = Path(__file__).parent / ".cache"
CACHE_TTL: int = 3600 # 1 hour
def __init__(self, session: Session):
self.session = session
self.vertical: Optional[Vertical] = None
self.venue_map: Dict[str, int] = {} # external_id -> our_id
self.song_map: Dict[str, int] = {}
self.show_map: Dict[str, int] = {}
self.tour_map: Dict[str, int] = {}
self.last_request_time: float = 0
# Ensure cache directory exists
self.CACHE_DIR.mkdir(exist_ok=True)
def _rate_limit(self):
"""Ensure we don't exceed rate limits"""
elapsed = time.time() - self.last_request_time
if elapsed < self.REQUEST_DELAY:
time.sleep(self.REQUEST_DELAY - elapsed)
self.last_request_time = time.time()
def _cache_key(self, url: str, params: Optional[Dict] = None) -> str:
"""Generate cache key for request"""
key_data = url + (json.dumps(params, sort_keys=True) if params else "")
return hashlib.md5(key_data.encode()).hexdigest()
def _get_cache(self, cache_key: str) -> Optional[Dict]:
"""Get cached response if valid"""
cache_file = self.CACHE_DIR / f"{cache_key}.json"
if cache_file.exists():
try:
data = json.loads(cache_file.read_text())
if time.time() - data.get("cached_at", 0) < self.CACHE_TTL:
return data.get("response")
except (json.JSONDecodeError, KeyError):
pass
return None
def _set_cache(self, cache_key: str, response: Any):
"""Store response in cache"""
cache_file = self.CACHE_DIR / f"{cache_key}.json"
cache_file.write_text(json.dumps({
"cached_at": time.time(),
"response": response
}))
def fetch_json(self, url: str, params: Optional[Dict] = None,
headers: Optional[Dict] = None, use_cache: bool = True) -> Optional[Dict]:
"""Fetch JSON with rate limiting and caching"""
cache_key = self._cache_key(url, params)
if use_cache:
cached = self._get_cache(cache_key)
if cached is not None:
return cached
self._rate_limit()
try:
response = requests.get(url, params=params, headers=headers, timeout=30)
response.raise_for_status()
data = response.json()
if use_cache:
self._set_cache(cache_key, data)
return data
except requests.RequestException as e:
print(f"❌ Request failed: {url} - {e}")
return None
def get_or_create_vertical(self) -> Vertical:
"""Get or create the vertical for this importer"""
if self.vertical:
return self.vertical
self.vertical = self.session.exec(
select(Vertical).where(Vertical.slug == self.VERTICAL_SLUG)
).first()
if not self.vertical:
self.vertical = Vertical(
name=self.VERTICAL_NAME,
slug=self.VERTICAL_SLUG,
description=self.VERTICAL_DESCRIPTION
)
self.session.add(self.vertical)
self.session.commit()
self.session.refresh(self.vertical)
print(f"✓ Created vertical: {self.VERTICAL_NAME}")
else:
print(f"✓ Using existing vertical: {self.VERTICAL_NAME}")
return self.vertical
def upsert_venue(self, name: str, city: str, state: Optional[str],
country: str, external_id: Optional[str] = None,
capacity: Optional[int] = None) -> int:
"""Insert or update a venue, return internal ID"""
# Check if we've already processed this
if external_id and external_id in self.venue_map:
return self.venue_map[external_id]
# Check database
existing = self.session.exec(
select(Venue).where(Venue.name == name, Venue.city == city)
).first()
if existing:
venue_id = existing.id
else:
venue = Venue(
name=name,
slug=generate_slug(name),
city=city,
state=state,
country=country,
capacity=capacity
)
self.session.add(venue)
self.session.commit()
self.session.refresh(venue)
venue_id = venue.id
if external_id:
self.venue_map[external_id] = venue_id
return venue_id
def upsert_song(self, title: str, original_artist: Optional[str] = None,
external_id: Optional[str] = None) -> int:
"""Insert or update a song, return internal ID"""
if external_id and external_id in self.song_map:
return self.song_map[external_id]
vertical = self.get_or_create_vertical()
existing = self.session.exec(
select(Song).where(Song.title == title, Song.vertical_id == vertical.id)
).first()
if existing:
song_id = existing.id
else:
song = Song(
title=title,
slug=generate_slug(title),
original_artist=original_artist,
vertical_id=vertical.id
)
self.session.add(song)
self.session.commit()
self.session.refresh(song)
song_id = song.id
if external_id:
self.song_map[external_id] = song_id
return song_id
def upsert_tour(self, name: str, start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
external_id: Optional[str] = None) -> int:
"""Insert or update a tour, return internal ID"""
if external_id and external_id in self.tour_map:
return self.tour_map[external_id]
existing = self.session.exec(
select(Tour).where(Tour.name == name)
).first()
if existing:
tour_id = existing.id
else:
tour = Tour(
name=name,
slug=generate_slug(name),
start_date=start_date,
end_date=end_date
)
self.session.add(tour)
self.session.commit()
self.session.refresh(tour)
tour_id = tour.id
if external_id:
self.tour_map[external_id] = tour_id
return tour_id
def upsert_show(self, date: datetime, venue_id: int,
tour_id: Optional[int] = None, notes: Optional[str] = None,
external_id: Optional[str] = None) -> int:
"""Insert or update a show, return internal ID"""
if external_id and external_id in self.show_map:
return self.show_map[external_id]
vertical = self.get_or_create_vertical()
# Check by date + venue
existing = self.session.exec(
select(Show).where(
Show.date == date,
Show.venue_id == venue_id,
Show.vertical_id == vertical.id
)
).first()
if existing:
show_id = existing.id
else:
# Get venue name for slug
venue = self.session.get(Venue, venue_id)
venue_name = venue.name if venue else "unknown"
show = Show(
date=date,
slug=generate_show_slug(date.strftime("%Y-%m-%d"), venue_name),
vertical_id=vertical.id,
venue_id=venue_id,
tour_id=tour_id,
notes=notes
)
self.session.add(show)
self.session.commit()
self.session.refresh(show)
show_id = show.id
if external_id:
self.show_map[external_id] = show_id
return show_id
def upsert_performance(self, show_id: int, song_id: int, position: int,
set_name: Optional[str] = None, segue: bool = False,
notes: Optional[str] = None) -> int:
"""Insert or update a performance, return internal ID"""
existing = self.session.exec(
select(Performance).where(
Performance.show_id == show_id,
Performance.song_id == song_id,
Performance.position == position
)
).first()
if existing:
return existing.id
perf = Performance(
show_id=show_id,
song_id=song_id,
position=position,
set_name=set_name,
segue=segue,
notes=notes
)
self.session.add(perf)
self.session.commit()
self.session.refresh(perf)
return perf.id
@abstractmethod
def import_all(self):
"""Main entry point - implement in subclasses"""
pass
@abstractmethod
def import_songs(self) -> Dict[str, int]:
"""Import all songs, return mapping of external_id -> internal_id"""
pass
@abstractmethod
def import_venues(self) -> Dict[str, int]:
"""Import all venues, return mapping of external_id -> internal_id"""
pass
@abstractmethod
def import_shows(self) -> Dict[str, int]:
"""Import all shows, return mapping of external_id -> internal_id"""
pass
@abstractmethod
def import_setlists(self):
"""Import setlists/performances for all shows"""
pass