From 8620841932710fdbbb27ae27e689772bf54a7f84 Mon Sep 17 00:00:00 2001 From: fullsizemalt <106900403+fullsizemalt@users.noreply.github.com> Date: Sun, 21 Dec 2025 22:20:10 -0800 Subject: [PATCH] feat: Add YouTube video import script for performances and shows --- backend/import_youtube.py | 185 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 backend/import_youtube.py diff --git a/backend/import_youtube.py b/backend/import_youtube.py new file mode 100644 index 0000000..9a15e37 --- /dev/null +++ b/backend/import_youtube.py @@ -0,0 +1,185 @@ +""" +YouTube Video Import Script +Parses youtube.md and links videos to Performance and Show entities. +""" +import json +import re +from datetime import datetime +from sqlmodel import Session, select +from database import engine +from models import Performance, Show, Song + +# Construct YouTube embed URL from videoId +def make_youtube_url(video_id: str) -> str: + return f"https://www.youtube.com/watch?v={video_id}" + +def parse_youtube_md(filepath: str) -> list: + """Extract JSON array from youtube.md markdown file.""" + with open(filepath, 'r') as f: + content = f.read() + + # Find JSON block (between ```json and ```) + match = re.search(r'```json\s*\n?\s*(\[.*?\])', content, re.DOTALL) + if not match: + print("No JSON block found in file.") + return [] + + json_str = match.group(1) + # Clean up escaped characters from markdown + json_str = json_str.replace('\\-', '-').replace('\\_', '_') + + try: + return json.loads(json_str) + except json.JSONDecodeError as e: + print(f"JSON parse error: {e}") + return [] + +def normalize_title(title: str) -> str: + """Normalize song title for matching.""" + return title.strip().lower() + +def import_videos(videos: list): + """Import video links into the database.""" + stats = { + 'songs_matched': 0, + 'songs_not_found': 0, + 'sequences_processed': 0, + 'full_shows_matched': 0, + 'full_shows_not_found': 0, + 'skipped': 0 + } + + with Session(engine) as session: + for video in videos: + video_id = video.get('videoId') + title = video.get('title', '') + video_type = video.get('type', 'song') + date_str = video.get('date') + youtube_url = make_youtube_url(video_id) + + if video_type == 'documentary': + print(f"[SKIP] Documentary: {title}") + stats['skipped'] += 1 + continue + + if video_type == 'visualizer': + print(f"[SKIP] Visualizer: {title}") + stats['skipped'] += 1 + continue + + if video_type == 'session': + print(f"[SKIP] Session: {title}") + stats['skipped'] += 1 + continue + + if video_type == 'full_show': + # Match by date or event name + event_name = video.get('event') + if date_str: + show_date = datetime.strptime(date_str, '%Y-%m-%d') + statement = select(Show).where(Show.date == show_date) + show = session.exec(statement).first() + if show: + show.youtube_link = youtube_url + session.add(show) + print(f"[FULL SHOW] Linked: {title} -> Show ID {show.id}") + stats['full_shows_matched'] += 1 + else: + print(f"[FULL SHOW NOT FOUND] {title} (date: {date_str})") + stats['full_shows_not_found'] += 1 + else: + print(f"[FULL SHOW SKIP] No date for: {title}") + stats['skipped'] += 1 + continue + + # Parse date + if not date_str: + print(f"[SKIP] No date: {title}") + stats['skipped'] += 1 + continue + + try: + show_date = datetime.strptime(date_str, '%Y-%m-%d') + except ValueError: + print(f"[SKIP] Invalid date format: {date_str}") + stats['skipped'] += 1 + continue + + # Find show by date + show_statement = select(Show).where(Show.date == show_date) + show = session.exec(show_statement).first() + if not show: + print(f"[SHOW NOT FOUND] Date: {date_str} for video: {title}") + stats['songs_not_found'] += 1 + continue + + # Handle sequences (multiple songs) + if video_type == 'sequence' or '→' in title: + song_titles = [s.strip() for s in title.split('→')] + matched_any = False + + for song_title in song_titles: + # Find song by title + song_statement = select(Song).where(Song.title.ilike(f"%{song_title}%")) + songs = session.exec(song_statement).all() + + for song in songs: + # Find performance for this song on this show + perf_statement = select(Performance).where( + Performance.show_id == show.id, + Performance.song_id == song.id + ) + perf = session.exec(perf_statement).first() + if perf: + perf.youtube_link = youtube_url + session.add(perf) + print(f"[SEQUENCE] Linked: {song_title} -> Performance ID {perf.id}") + matched_any = True + + if matched_any: + stats['sequences_processed'] += 1 + else: + print(f"[SEQUENCE NOT FOUND] {title} on {date_str}") + stats['songs_not_found'] += 1 + continue + + # Single song + song_statement = select(Song).where(Song.title.ilike(f"%{title}%")) + songs = session.exec(song_statement).all() + + matched = False + for song in songs: + perf_statement = select(Performance).where( + Performance.show_id == show.id, + Performance.song_id == song.id + ) + perf = session.exec(perf_statement).first() + if perf: + perf.youtube_link = youtube_url + session.add(perf) + print(f"[SONG] Linked: {title} -> Performance ID {perf.id}") + stats['songs_matched'] += 1 + matched = True + break + + if not matched: + print(f"[SONG NOT FOUND] {title} on {date_str}") + stats['songs_not_found'] += 1 + + session.commit() + + print("\n=== Import Summary ===") + for key, value in stats.items(): + print(f" {key}: {value}") + +if __name__ == "__main__": + import sys + + filepath = sys.argv[1] if len(sys.argv) > 1 else "../youtube.md" + print(f"Parsing YouTube data from: {filepath}") + + videos = parse_youtube_md(filepath) + print(f"Found {len(videos)} videos") + + if videos: + import_videos(videos)