feat: Add YouTube API fetch and import scripts with 620 videos

This commit is contained in:
fullsizemalt 2025-12-22 23:09:43 -08:00
parent bd6832705f
commit dc584af2f2
3 changed files with 5263 additions and 124 deletions

207
backend/fetch_youtube.py Normal file
View file

@ -0,0 +1,207 @@
"""
Fetch all videos from Goose YouTube channel using YouTube Data API v3
"""
import requests
import json
import re
from datetime import datetime
API_KEY = "AIzaSyCxDpv6HM-sPD8vPJIBffwa2-skOpEJkOU"
CHANNEL_HANDLE = "@GooseTheBand"
def get_channel_id(handle: str) -> str:
"""Get channel ID from handle."""
url = "https://www.googleapis.com/youtube/v3/search"
params = {
"key": API_KEY,
"q": handle,
"type": "channel",
"part": "snippet",
"maxResults": 1
}
resp = requests.get(url, params=params)
data = resp.json()
if "items" in data and len(data["items"]) > 0:
return data["items"][0]["snippet"]["channelId"]
return None
def get_uploads_playlist_id(channel_id: str) -> str:
"""Get the uploads playlist ID for a channel."""
url = "https://www.googleapis.com/youtube/v3/channels"
params = {
"key": API_KEY,
"id": channel_id,
"part": "contentDetails"
}
resp = requests.get(url, params=params)
data = resp.json()
if "items" in data and len(data["items"]) > 0:
return data["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]
return None
def get_all_videos(playlist_id: str) -> list:
"""Fetch all videos from a playlist (handles pagination)."""
videos = []
url = "https://www.googleapis.com/youtube/v3/playlistItems"
next_page_token = None
while True:
params = {
"key": API_KEY,
"playlistId": playlist_id,
"part": "snippet,contentDetails",
"maxResults": 50
}
if next_page_token:
params["pageToken"] = next_page_token
resp = requests.get(url, params=params)
data = resp.json()
if "error" in data:
print(f"API Error: {data['error']}")
break
for item in data.get("items", []):
snippet = item["snippet"]
video = {
"videoId": snippet["resourceId"]["videoId"],
"title": snippet["title"],
"description": snippet.get("description", ""),
"publishedAt": snippet["publishedAt"],
"thumbnails": snippet.get("thumbnails", {})
}
videos.append(video)
next_page_token = data.get("nextPageToken")
print(f"Fetched {len(videos)} videos so far...")
if not next_page_token:
break
return videos
def parse_video_metadata(videos: list) -> list:
"""Parse video titles to extract show date and type."""
parsed = []
# Date patterns to look for in titles/descriptions
date_patterns = [
r'(\d{1,2})[./](\d{1,2})[./](\d{2,4})', # M/D/YY or M.D.YYYY
r'(\d{4})-(\d{2})-(\d{2})', # YYYY-MM-DD
]
for video in videos:
title = video["title"]
desc = video.get("description", "")
# Determine video type
video_type = "song" # default
title_lower = title.lower()
if "full show" in title_lower or "live at" in title_lower or "night 1" in title_lower or "night 2" in title_lower or "night 3" in title_lower:
video_type = "full_show"
elif "" in title or "->" in title:
video_type = "sequence"
elif "documentary" in title_lower or "behind" in title_lower:
video_type = "documentary"
elif "visualizer" in title_lower:
video_type = "visualizer"
elif "session" in title_lower or "studio" in title_lower:
video_type = "session"
# Try to extract date
show_date = None
# Check description first (often has date info)
combined_text = f"{title} {desc}"
for pattern in date_patterns:
match = re.search(pattern, combined_text)
if match:
groups = match.groups()
try:
if len(groups[0]) == 4: # YYYY-MM-DD
show_date = f"{groups[0]}-{groups[1]}-{groups[2]}"
else: # M/D/YY
year = groups[2]
if len(year) == 2:
year = "20" + year if int(year) < 50 else "19" + year
month = groups[0].zfill(2)
day = groups[1].zfill(2)
show_date = f"{year}-{month}-{day}"
break
except:
pass
# Extract venue from title if possible
venue = None
venue_patterns = [
r'@ (.+)$',
r'at (.+?) -',
r'Live at (.+)',
r'- (.+?, [A-Z]{2})$',
]
for pattern in venue_patterns:
match = re.search(pattern, title, re.IGNORECASE)
if match:
venue = match.group(1).strip()
break
parsed.append({
"videoId": video["videoId"],
"title": title,
"date": show_date,
"venue": venue,
"type": video_type,
"publishedAt": video["publishedAt"]
})
return parsed
def main():
print("Fetching Goose YouTube channel videos...")
# Get channel ID
print(f"Looking up channel: {CHANNEL_HANDLE}")
channel_id = get_channel_id(CHANNEL_HANDLE)
if not channel_id:
print("Could not find channel!")
return
print(f"Channel ID: {channel_id}")
# Get uploads playlist
uploads_playlist = get_uploads_playlist_id(channel_id)
if not uploads_playlist:
print("Could not find uploads playlist!")
return
print(f"Uploads playlist: {uploads_playlist}")
# Fetch all videos
videos = get_all_videos(uploads_playlist)
print(f"\nTotal videos found: {len(videos)}")
# Parse metadata
parsed = parse_video_metadata(videos)
# Save to JSON
output_file = "youtube_videos.json"
with open(output_file, 'w') as f:
json.dump(parsed, f, indent=2)
print(f"\nSaved to {output_file}")
# Show stats
types = {}
dated = 0
for v in parsed:
types[v["type"]] = types.get(v["type"], 0) + 1
if v["date"]:
dated += 1
print("\n=== Stats ===")
print(f"Total: {len(parsed)}")
print(f"With dates: {dated}")
for vtype, count in sorted(types.items()):
print(f" {vtype}: {count}")
if __name__ == "__main__":
main()

View file

@ -1,6 +1,6 @@
""" """
YouTube Video Import Script YouTube Video Import Script v2
Parses youtube.md and links videos to Performance and Show entities. Imports videos from youtube_videos.json into the database.
""" """
import json import json
import re import re
@ -9,194 +9,164 @@ from sqlmodel import Session, select
from database import engine from database import engine
from models import Performance, Show, Song from models import Performance, Show, Song
# Construct YouTube embed URL from videoId
def make_youtube_url(video_id: str) -> str: def make_youtube_url(video_id: str) -> str:
return f"https://www.youtube.com/watch?v={video_id}" return f"https://www.youtube.com/watch?v={video_id}"
def parse_youtube_md(filepath: str) -> list:
"""Extract JSON array from youtube.md markdown file."""
with open(filepath, 'r') as f:
content = f.read()
# Parse line-by-line looking for JSON objects
# This handles the escaped markdown format in the source file
videos = []
in_json = False
for line in content.split('\n'):
line = line.strip()
# Detect start of JSON block (with or without escapes)
if 'json' in line.lower() and ('`' in line or '\\' in line):
in_json = True
continue
# Skip array markers
if line in ['[', '\\[', ']']:
continue
# Process JSON objects
if in_json and line.startswith('{'):
# Clean the line
clean_line = line.rstrip(',').rstrip()
# Remove trailing markdown escapes
clean_line = clean_line.replace('\\_', '_').replace('\\-', '-')
if clean_line.endswith('}'):
try:
obj = json.loads(clean_line)
videos.append(obj)
except json.JSONDecodeError as e:
print(f"Parse error on line: {clean_line[:50]}... - {e}")
return videos
def normalize_title(title: str) -> str: def extract_song_title(title: str) -> str:
"""Normalize song title for matching.""" """Extract the actual song title from YouTube video title."""
return title.strip().lower() # Remove common prefixes
title = re.sub(r'^Goose\s*[-–—]\s*', '', title, flags=re.IGNORECASE)
# Remove date patterns at end (e.g., "- 12/13/25 Providence, RI")
title = re.sub(r'\s*[-–—]\s*\d{1,2}/\d{1,2}/\d{2,4}.*$', '', title)
# Remove "Live at..." suffix
title = re.sub(r'\s*[-–—]\s*Live at.*$', '', title, flags=re.IGNORECASE)
# Remove "(Official Audio)" etc
title = re.sub(r'\s*\(Official\s*(Audio|Video|Visualizer)\)', '', title, flags=re.IGNORECASE)
# Remove "(4K HDR)" etc
title = re.sub(r'\s*\(4K\s*HDR\)', '', title, flags=re.IGNORECASE)
# Remove "Set I Opener" etc
title = re.sub(r'\s*Set\s*(I|II|1|2)?\s*Opener.*$', '', title, flags=re.IGNORECASE)
return title.strip()
def import_videos(videos: list):
def import_videos():
"""Import video links into the database.""" """Import video links into the database."""
with open("youtube_videos.json", 'r') as f:
videos = json.load(f)
stats = { stats = {
'songs_matched': 0, 'songs_matched': 0,
'songs_not_found': 0, 'songs_not_found': 0,
'sequences_processed': 0, 'sequences_processed': 0,
'full_shows_matched': 0, 'full_shows_matched': 0,
'full_shows_not_found': 0, 'full_shows_not_found': 0,
'skipped': 0 'no_date': 0,
'skipped': 0,
'show_not_found': 0
} }
with Session(engine) as session: with Session(engine) as session:
for video in videos: for video in videos:
video_id = video.get('videoId') video_id = video.get('videoId')
title = video.get('title', '') raw_title = video.get('title', '')
video_type = video.get('type', 'song') video_type = video.get('type', 'song')
date_str = video.get('date') date_str = video.get('date')
youtube_url = make_youtube_url(video_id) youtube_url = make_youtube_url(video_id)
if video_type == 'documentary': # Skip non-performance content
print(f"[SKIP] Documentary: {title}") if video_type in ('documentary', 'visualizer', 'session'):
stats['skipped'] += 1 stats['skipped'] += 1
continue continue
if video_type == 'visualizer': # Skip videos without dates (can't match to show)
print(f"[SKIP] Visualizer: {title}") if not date_str:
stats['skipped'] += 1 stats['no_date'] += 1
continue
if video_type == 'session':
print(f"[SKIP] Session: {title}")
stats['skipped'] += 1
continue
if video_type == 'full_show':
# Match by date or event name
event_name = video.get('event')
if date_str:
show_date = datetime.strptime(date_str, '%Y-%m-%d')
statement = select(Show).where(Show.date == show_date)
show = session.exec(statement).first()
if show:
show.youtube_link = youtube_url
session.add(show)
print(f"[FULL SHOW] Linked: {title} -> Show ID {show.id}")
stats['full_shows_matched'] += 1
else:
print(f"[FULL SHOW NOT FOUND] {title} (date: {date_str})")
stats['full_shows_not_found'] += 1
else:
print(f"[FULL SHOW SKIP] No date for: {title}")
stats['skipped'] += 1
continue continue
# Parse date # Parse date
if not date_str:
print(f"[SKIP] No date: {title}")
stats['skipped'] += 1
continue
try: try:
show_date = datetime.strptime(date_str, '%Y-%m-%d') show_date = datetime.strptime(date_str, '%Y-%m-%d')
except ValueError: except ValueError:
print(f"[SKIP] Invalid date format: {date_str}") stats['no_date'] += 1
stats['skipped'] += 1
continue continue
# Find show by date # Find show by date
show_statement = select(Show).where(Show.date == show_date) show = session.exec(
show = session.exec(show_statement).first() select(Show).where(Show.date == show_date)
).first()
if not show: if not show:
print(f"[SHOW NOT FOUND] Date: {date_str} for video: {title}") stats['show_not_found'] += 1
stats['songs_not_found'] += 1
continue continue
# Handle sequences (multiple songs) # Handle full shows - link to Show entity
if video_type == 'sequence' or '' in title: if video_type == 'full_show':
song_titles = [s.strip() for s in title.split('')] show.youtube_link = youtube_url
session.add(show)
stats['full_shows_matched'] += 1
print(f"[FULL SHOW] {date_str}: {raw_title[:50]}")
continue
# Extract song title
song_title = extract_song_title(raw_title)
# Handle sequences (multiple songs with →)
if video_type == 'sequence' or '' in song_title:
song_titles = [s.strip() for s in re.split(r'[→>]', song_title)]
matched_any = False matched_any = False
for song_title in song_titles: for title in song_titles:
# Find song by title if not title:
song_statement = select(Song).where(Song.title.ilike(f"%{song_title}%")) continue
songs = session.exec(song_statement).all() # Find song by title (case insensitive partial match)
songs = session.exec(
select(Song).where(Song.title.ilike(f"%{title}%"))
).all()
for song in songs: for song in songs:
# Find performance for this song on this show perf = session.exec(
perf_statement = select(Performance).where( select(Performance).where(
Performance.show_id == show.id, Performance.show_id == show.id,
Performance.song_id == song.id Performance.song_id == song.id
) )
perf = session.exec(perf_statement).first() ).first()
if perf: if perf:
perf.youtube_link = youtube_url perf.youtube_link = youtube_url
session.add(perf) session.add(perf)
print(f"[SEQUENCE] Linked: {song_title} -> Performance ID {perf.id}")
matched_any = True matched_any = True
print(f"[SEQ] {date_str}: {title} -> Perf {perf.id}")
if matched_any: if matched_any:
stats['sequences_processed'] += 1 stats['sequences_processed'] += 1
else: else:
print(f"[SEQUENCE NOT FOUND] {title} on {date_str}")
stats['songs_not_found'] += 1 stats['songs_not_found'] += 1
continue continue
# Single song # Single song - find and link
song_statement = select(Song).where(Song.title.ilike(f"%{title}%")) songs = session.exec(
songs = session.exec(song_statement).all() select(Song).where(Song.title.ilike(f"%{song_title}%"))
).all()
matched = False matched = False
for song in songs: for song in songs:
perf_statement = select(Performance).where( perf = session.exec(
Performance.show_id == show.id, select(Performance).where(
Performance.song_id == song.id Performance.show_id == show.id,
) Performance.song_id == song.id
perf = session.exec(perf_statement).first() )
).first()
if perf: if perf:
perf.youtube_link = youtube_url perf.youtube_link = youtube_url
session.add(perf) session.add(perf)
print(f"[SONG] Linked: {title} -> Performance ID {perf.id}")
stats['songs_matched'] += 1
matched = True matched = True
stats['songs_matched'] += 1
print(f"[SONG] {date_str}: {song_title} -> Perf {perf.id}")
break break
if not matched: if not matched:
print(f"[SONG NOT FOUND] {title} on {date_str}")
stats['songs_not_found'] += 1 stats['songs_not_found'] += 1
session.commit() session.commit()
print("\n=== Import Summary ===") print("\n" + "="*50)
print("IMPORT SUMMARY")
print("="*50)
for key, value in stats.items(): for key, value in stats.items():
print(f" {key}: {value}") print(f" {key}: {value}")
total_linked = stats['songs_matched'] + stats['sequences_processed'] + stats['full_shows_matched']
print(f"\n TOTAL LINKED: {total_linked}")
if __name__ == "__main__": if __name__ == "__main__":
import sys import_videos()
filepath = sys.argv[1] if len(sys.argv) > 1 else "../youtube.md"
print(f"Parsing YouTube data from: {filepath}")
videos = parse_youtube_md(filepath)
print(f"Found {len(videos)} videos")
if videos:
import_videos(videos)

4962
backend/youtube_videos.json Normal file

File diff suppressed because it is too large Load diff