elmeg-demo/backend/import_youtube.py

197 lines
7.3 KiB
Python

"""
YouTube Video Import Script
Parses youtube.md and links videos to Performance and Show entities.
"""
import json
import re
from datetime import datetime
from sqlmodel import Session, select
from database import engine
from models import Performance, Show, Song
# Construct YouTube embed URL from videoId
def make_youtube_url(video_id: str) -> str:
return f"https://www.youtube.com/watch?v={video_id}"
def parse_youtube_md(filepath: str) -> list:
"""Extract JSON array from youtube.md markdown file."""
with open(filepath, 'r') as f:
content = f.read()
# Clean up escaped characters from markdown first
content = content.replace('\\`', '`').replace('\\-', '-').replace('\\_', '_')
# Find JSON block (between ```json and ```)
match = re.search(r'```json\s*\n?\s*(\[.*)', content, re.DOTALL)
if not match:
print("No JSON block found in file.")
return []
json_str = match.group(1)
# Find the array - it may not be closed properly, so we find opening [ and match to end
# Try to parse as much valid JSON as possible
try:
# Try to find a complete JSON array
return json.loads(json_str.split('```')[0].strip())
except json.JSONDecodeError:
# If that fails, try line-by-line parsing
lines = json_str.split('\n')
videos = []
for line in lines:
line = line.strip().rstrip(',')
if line.startswith('{') and line.endswith('}'):
try:
videos.append(json.loads(line))
except json.JSONDecodeError:
continue
return videos
def normalize_title(title: str) -> str:
"""Normalize song title for matching."""
return title.strip().lower()
def import_videos(videos: list):
"""Import video links into the database."""
stats = {
'songs_matched': 0,
'songs_not_found': 0,
'sequences_processed': 0,
'full_shows_matched': 0,
'full_shows_not_found': 0,
'skipped': 0
}
with Session(engine) as session:
for video in videos:
video_id = video.get('videoId')
title = video.get('title', '')
video_type = video.get('type', 'song')
date_str = video.get('date')
youtube_url = make_youtube_url(video_id)
if video_type == 'documentary':
print(f"[SKIP] Documentary: {title}")
stats['skipped'] += 1
continue
if video_type == 'visualizer':
print(f"[SKIP] Visualizer: {title}")
stats['skipped'] += 1
continue
if video_type == 'session':
print(f"[SKIP] Session: {title}")
stats['skipped'] += 1
continue
if video_type == 'full_show':
# Match by date or event name
event_name = video.get('event')
if date_str:
show_date = datetime.strptime(date_str, '%Y-%m-%d')
statement = select(Show).where(Show.date == show_date)
show = session.exec(statement).first()
if show:
show.youtube_link = youtube_url
session.add(show)
print(f"[FULL SHOW] Linked: {title} -> Show ID {show.id}")
stats['full_shows_matched'] += 1
else:
print(f"[FULL SHOW NOT FOUND] {title} (date: {date_str})")
stats['full_shows_not_found'] += 1
else:
print(f"[FULL SHOW SKIP] No date for: {title}")
stats['skipped'] += 1
continue
# Parse date
if not date_str:
print(f"[SKIP] No date: {title}")
stats['skipped'] += 1
continue
try:
show_date = datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
print(f"[SKIP] Invalid date format: {date_str}")
stats['skipped'] += 1
continue
# Find show by date
show_statement = select(Show).where(Show.date == show_date)
show = session.exec(show_statement).first()
if not show:
print(f"[SHOW NOT FOUND] Date: {date_str} for video: {title}")
stats['songs_not_found'] += 1
continue
# Handle sequences (multiple songs)
if video_type == 'sequence' or '' in title:
song_titles = [s.strip() for s in title.split('')]
matched_any = False
for song_title in song_titles:
# Find song by title
song_statement = select(Song).where(Song.title.ilike(f"%{song_title}%"))
songs = session.exec(song_statement).all()
for song in songs:
# Find performance for this song on this show
perf_statement = select(Performance).where(
Performance.show_id == show.id,
Performance.song_id == song.id
)
perf = session.exec(perf_statement).first()
if perf:
perf.youtube_link = youtube_url
session.add(perf)
print(f"[SEQUENCE] Linked: {song_title} -> Performance ID {perf.id}")
matched_any = True
if matched_any:
stats['sequences_processed'] += 1
else:
print(f"[SEQUENCE NOT FOUND] {title} on {date_str}")
stats['songs_not_found'] += 1
continue
# Single song
song_statement = select(Song).where(Song.title.ilike(f"%{title}%"))
songs = session.exec(song_statement).all()
matched = False
for song in songs:
perf_statement = select(Performance).where(
Performance.show_id == show.id,
Performance.song_id == song.id
)
perf = session.exec(perf_statement).first()
if perf:
perf.youtube_link = youtube_url
session.add(perf)
print(f"[SONG] Linked: {title} -> Performance ID {perf.id}")
stats['songs_matched'] += 1
matched = True
break
if not matched:
print(f"[SONG NOT FOUND] {title} on {date_str}")
stats['songs_not_found'] += 1
session.commit()
print("\n=== Import Summary ===")
for key, value in stats.items():
print(f" {key}: {value}")
if __name__ == "__main__":
import sys
filepath = sys.argv[1] if len(sys.argv) > 1 else "../youtube.md"
print(f"Parsing YouTube data from: {filepath}")
videos = parse_youtube_md(filepath)
print(f"Found {len(videos)} videos")
if videos:
import_videos(videos)