202 lines
7.4 KiB
Python
202 lines
7.4 KiB
Python
"""
|
|
YouTube Video Import Script
|
|
Parses youtube.md and links videos to Performance and Show entities.
|
|
"""
|
|
import json
|
|
import re
|
|
from datetime import datetime
|
|
from sqlmodel import Session, select
|
|
from database import engine
|
|
from models import Performance, Show, Song
|
|
|
|
# Construct YouTube embed URL from videoId
|
|
def make_youtube_url(video_id: str) -> str:
|
|
return f"https://www.youtube.com/watch?v={video_id}"
|
|
|
|
def parse_youtube_md(filepath: str) -> list:
|
|
"""Extract JSON array from youtube.md markdown file."""
|
|
with open(filepath, 'r') as f:
|
|
content = f.read()
|
|
|
|
# Parse line-by-line looking for JSON objects
|
|
# This handles the escaped markdown format in the source file
|
|
videos = []
|
|
in_json = False
|
|
|
|
for line in content.split('\n'):
|
|
line = line.strip()
|
|
|
|
# Detect start of JSON block (with or without escapes)
|
|
if 'json' in line.lower() and ('`' in line or '\\' in line):
|
|
in_json = True
|
|
continue
|
|
|
|
# Skip array markers
|
|
if line in ['[', '\\[', ']']:
|
|
continue
|
|
|
|
# Process JSON objects
|
|
if in_json and line.startswith('{'):
|
|
# Clean the line
|
|
clean_line = line.rstrip(',').rstrip()
|
|
# Remove trailing markdown escapes
|
|
clean_line = clean_line.replace('\\_', '_').replace('\\-', '-')
|
|
|
|
if clean_line.endswith('}'):
|
|
try:
|
|
obj = json.loads(clean_line)
|
|
videos.append(obj)
|
|
except json.JSONDecodeError as e:
|
|
print(f"Parse error on line: {clean_line[:50]}... - {e}")
|
|
|
|
return videos
|
|
|
|
def normalize_title(title: str) -> str:
|
|
"""Normalize song title for matching."""
|
|
return title.strip().lower()
|
|
|
|
def import_videos(videos: list):
|
|
"""Import video links into the database."""
|
|
stats = {
|
|
'songs_matched': 0,
|
|
'songs_not_found': 0,
|
|
'sequences_processed': 0,
|
|
'full_shows_matched': 0,
|
|
'full_shows_not_found': 0,
|
|
'skipped': 0
|
|
}
|
|
|
|
with Session(engine) as session:
|
|
for video in videos:
|
|
video_id = video.get('videoId')
|
|
title = video.get('title', '')
|
|
video_type = video.get('type', 'song')
|
|
date_str = video.get('date')
|
|
youtube_url = make_youtube_url(video_id)
|
|
|
|
if video_type == 'documentary':
|
|
print(f"[SKIP] Documentary: {title}")
|
|
stats['skipped'] += 1
|
|
continue
|
|
|
|
if video_type == 'visualizer':
|
|
print(f"[SKIP] Visualizer: {title}")
|
|
stats['skipped'] += 1
|
|
continue
|
|
|
|
if video_type == 'session':
|
|
print(f"[SKIP] Session: {title}")
|
|
stats['skipped'] += 1
|
|
continue
|
|
|
|
if video_type == 'full_show':
|
|
# Match by date or event name
|
|
event_name = video.get('event')
|
|
if date_str:
|
|
show_date = datetime.strptime(date_str, '%Y-%m-%d')
|
|
statement = select(Show).where(Show.date == show_date)
|
|
show = session.exec(statement).first()
|
|
if show:
|
|
show.youtube_link = youtube_url
|
|
session.add(show)
|
|
print(f"[FULL SHOW] Linked: {title} -> Show ID {show.id}")
|
|
stats['full_shows_matched'] += 1
|
|
else:
|
|
print(f"[FULL SHOW NOT FOUND] {title} (date: {date_str})")
|
|
stats['full_shows_not_found'] += 1
|
|
else:
|
|
print(f"[FULL SHOW SKIP] No date for: {title}")
|
|
stats['skipped'] += 1
|
|
continue
|
|
|
|
# Parse date
|
|
if not date_str:
|
|
print(f"[SKIP] No date: {title}")
|
|
stats['skipped'] += 1
|
|
continue
|
|
|
|
try:
|
|
show_date = datetime.strptime(date_str, '%Y-%m-%d')
|
|
except ValueError:
|
|
print(f"[SKIP] Invalid date format: {date_str}")
|
|
stats['skipped'] += 1
|
|
continue
|
|
|
|
# Find show by date
|
|
show_statement = select(Show).where(Show.date == show_date)
|
|
show = session.exec(show_statement).first()
|
|
if not show:
|
|
print(f"[SHOW NOT FOUND] Date: {date_str} for video: {title}")
|
|
stats['songs_not_found'] += 1
|
|
continue
|
|
|
|
# Handle sequences (multiple songs)
|
|
if video_type == 'sequence' or '→' in title:
|
|
song_titles = [s.strip() for s in title.split('→')]
|
|
matched_any = False
|
|
|
|
for song_title in song_titles:
|
|
# Find song by title
|
|
song_statement = select(Song).where(Song.title.ilike(f"%{song_title}%"))
|
|
songs = session.exec(song_statement).all()
|
|
|
|
for song in songs:
|
|
# Find performance for this song on this show
|
|
perf_statement = select(Performance).where(
|
|
Performance.show_id == show.id,
|
|
Performance.song_id == song.id
|
|
)
|
|
perf = session.exec(perf_statement).first()
|
|
if perf:
|
|
perf.youtube_link = youtube_url
|
|
session.add(perf)
|
|
print(f"[SEQUENCE] Linked: {song_title} -> Performance ID {perf.id}")
|
|
matched_any = True
|
|
|
|
if matched_any:
|
|
stats['sequences_processed'] += 1
|
|
else:
|
|
print(f"[SEQUENCE NOT FOUND] {title} on {date_str}")
|
|
stats['songs_not_found'] += 1
|
|
continue
|
|
|
|
# Single song
|
|
song_statement = select(Song).where(Song.title.ilike(f"%{title}%"))
|
|
songs = session.exec(song_statement).all()
|
|
|
|
matched = False
|
|
for song in songs:
|
|
perf_statement = select(Performance).where(
|
|
Performance.show_id == show.id,
|
|
Performance.song_id == song.id
|
|
)
|
|
perf = session.exec(perf_statement).first()
|
|
if perf:
|
|
perf.youtube_link = youtube_url
|
|
session.add(perf)
|
|
print(f"[SONG] Linked: {title} -> Performance ID {perf.id}")
|
|
stats['songs_matched'] += 1
|
|
matched = True
|
|
break
|
|
|
|
if not matched:
|
|
print(f"[SONG NOT FOUND] {title} on {date_str}")
|
|
stats['songs_not_found'] += 1
|
|
|
|
session.commit()
|
|
|
|
print("\n=== Import Summary ===")
|
|
for key, value in stats.items():
|
|
print(f" {key}: {value}")
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
filepath = sys.argv[1] if len(sys.argv) > 1 else "../youtube.md"
|
|
print(f"Parsing YouTube data from: {filepath}")
|
|
|
|
videos = parse_youtube_md(filepath)
|
|
print(f"Found {len(videos)} videos")
|
|
|
|
if videos:
|
|
import_videos(videos)
|