- Fork elmeg-demo codebase for multi-band support - Add data importer infrastructure with base class - Create band-specific importers: - phish.py: Phish.net API v5 - grateful_dead.py: Grateful Stats API - setlistfm.py: Dead & Company, Billy Strings (Setlist.fm) - Add spec-kit configuration for Gemini - Update README with supported bands and architecture
146 lines
4.7 KiB
Python
146 lines
4.7 KiB
Python
"""
|
|
Email Service - AWS SES v2 integration using stored templates.
|
|
|
|
Uses SES stored templates for consistent, branded transactional emails:
|
|
- ELMEG_EMAIL_VERIFICATION
|
|
- ELMEG_PASSWORD_RESET
|
|
- ELMEG_SECURITY_ALERT
|
|
"""
|
|
import os
|
|
import json
|
|
import secrets
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
|
|
# Configuration
|
|
AWS_REGION = os.getenv("AWS_SES_REGION", "us-east-1")
|
|
EMAIL_FROM = os.getenv("EMAIL_FROM", "noreply@elmeg.xyz")
|
|
FRONTEND_URL = os.getenv("FRONTEND_URL", "https://elmeg.xyz")
|
|
SUPPORT_EMAIL = os.getenv("SUPPORT_EMAIL", "support@elmeg.xyz")
|
|
APP_NAME = "Elmeg"
|
|
|
|
# SES Template Names
|
|
TEMPLATE_VERIFICATION = "ELMEG_EMAIL_VERIFICATION"
|
|
TEMPLATE_PASSWORD_RESET = "ELMEG_PASSWORD_RESET"
|
|
TEMPLATE_SECURITY_ALERT = "ELMEG_SECURITY_ALERT"
|
|
|
|
|
|
def get_ses_client():
|
|
"""Get boto3 SES v2 client"""
|
|
return boto3.client('sesv2', region_name=AWS_REGION)
|
|
|
|
|
|
def is_email_configured() -> bool:
|
|
"""Check if email is properly configured"""
|
|
return bool(os.getenv("AWS_ACCESS_KEY_ID") and os.getenv("AWS_SECRET_ACCESS_KEY"))
|
|
|
|
|
|
def send_templated_email(
|
|
to: str,
|
|
template_name: str,
|
|
template_data: dict
|
|
) -> dict:
|
|
"""
|
|
Send email using SES stored template.
|
|
|
|
Returns:
|
|
dict with 'success', 'message_id' (on success), 'error' (on failure)
|
|
"""
|
|
# Dev mode - log instead of sending
|
|
if not is_email_configured():
|
|
print(f"[EMAIL DEV MODE] To: {to}, Template: {template_name}")
|
|
print(f"[EMAIL DEV MODE] Data: {json.dumps(template_data, indent=2)}")
|
|
return {"success": True, "message_id": "dev-mode", "dev_mode": True}
|
|
|
|
try:
|
|
client = get_ses_client()
|
|
response = client.send_email(
|
|
FromEmailAddress=EMAIL_FROM,
|
|
Destination={"ToAddresses": [to]},
|
|
Content={
|
|
"Template": {
|
|
"TemplateName": template_name,
|
|
"TemplateData": json.dumps(template_data)
|
|
}
|
|
}
|
|
)
|
|
message_id = response.get("MessageId", "unknown")
|
|
print(f"[Email] Sent {template_name} to {to}, MessageId: {message_id}")
|
|
return {"success": True, "message_id": message_id}
|
|
|
|
except ClientError as e:
|
|
error_msg = e.response.get('Error', {}).get('Message', str(e))
|
|
print(f"[Email] Failed to send {template_name} to {to}: {error_msg}")
|
|
return {"success": False, "error": error_msg}
|
|
|
|
|
|
# =============================================================================
|
|
# Email Functions
|
|
# =============================================================================
|
|
|
|
async def send_verification_email(email: str, token: str, user_name: Optional[str] = None) -> bool:
|
|
"""Send email verification using SES template"""
|
|
verification_link = f"{FRONTEND_URL}/verify-email?token={token}"
|
|
|
|
template_data = {
|
|
"user_name": user_name or email.split("@")[0],
|
|
"verification_link": verification_link,
|
|
"app_name": APP_NAME,
|
|
"support_email": SUPPORT_EMAIL
|
|
}
|
|
|
|
result = send_templated_email(email, TEMPLATE_VERIFICATION, template_data)
|
|
return result["success"]
|
|
|
|
|
|
async def send_password_reset_email(email: str, token: str, user_name: Optional[str] = None) -> bool:
|
|
"""Send password reset email using SES template"""
|
|
reset_link = f"{FRONTEND_URL}/reset-password?token={token}"
|
|
|
|
template_data = {
|
|
"user_name": user_name or email.split("@")[0],
|
|
"reset_link": reset_link,
|
|
"app_name": APP_NAME,
|
|
"support_email": SUPPORT_EMAIL
|
|
}
|
|
|
|
result = send_templated_email(email, TEMPLATE_PASSWORD_RESET, template_data)
|
|
return result["success"]
|
|
|
|
|
|
async def send_security_alert_email(
|
|
email: str,
|
|
security_event_description: str,
|
|
user_name: Optional[str] = None
|
|
) -> bool:
|
|
"""Send security alert email using SES template"""
|
|
template_data = {
|
|
"user_name": user_name or email.split("@")[0],
|
|
"security_event_description": security_event_description,
|
|
"app_name": APP_NAME,
|
|
"support_email": SUPPORT_EMAIL
|
|
}
|
|
|
|
result = send_templated_email(email, TEMPLATE_SECURITY_ALERT, template_data)
|
|
return result["success"]
|
|
|
|
|
|
# =============================================================================
|
|
# Token Generation & Expiry Helpers
|
|
# =============================================================================
|
|
|
|
def generate_token() -> str:
|
|
"""Generate a secure random token"""
|
|
return secrets.token_urlsafe(32)
|
|
|
|
|
|
def get_verification_expiry() -> datetime:
|
|
"""24 hour expiry for email verification"""
|
|
return datetime.utcnow() + timedelta(hours=24)
|
|
|
|
|
|
def get_reset_expiry() -> datetime:
|
|
"""1 hour expiry for password reset"""
|
|
return datetime.utcnow() + timedelta(hours=1)
|