""" Email Service - AWS SES v2 integration using stored templates. Uses SES stored templates for consistent, branded transactional emails: - ELMEG_EMAIL_VERIFICATION - ELMEG_PASSWORD_RESET - ELMEG_SECURITY_ALERT """ import os import json import secrets from datetime import datetime, timedelta from typing import Optional import boto3 from botocore.exceptions import ClientError # Configuration AWS_REGION = os.getenv("AWS_SES_REGION", "us-east-1") EMAIL_FROM = os.getenv("EMAIL_FROM", "noreply@elmeg.xyz") FRONTEND_URL = os.getenv("FRONTEND_URL", "https://elmeg.xyz") SUPPORT_EMAIL = os.getenv("SUPPORT_EMAIL", "support@elmeg.xyz") APP_NAME = "Elmeg" # SES Template Names TEMPLATE_VERIFICATION = "ELMEG_EMAIL_VERIFICATION" TEMPLATE_PASSWORD_RESET = "ELMEG_PASSWORD_RESET" TEMPLATE_SECURITY_ALERT = "ELMEG_SECURITY_ALERT" def get_ses_client(): """Get boto3 SES v2 client""" return boto3.client('sesv2', region_name=AWS_REGION) def is_email_configured() -> bool: """Check if email is properly configured""" return bool(os.getenv("AWS_ACCESS_KEY_ID") and os.getenv("AWS_SECRET_ACCESS_KEY")) def send_templated_email( to: str, template_name: str, template_data: dict ) -> dict: """ Send email using SES stored template. Returns: dict with 'success', 'message_id' (on success), 'error' (on failure) """ # Dev mode - log instead of sending if not is_email_configured(): print(f"[EMAIL DEV MODE] To: {to}, Template: {template_name}") print(f"[EMAIL DEV MODE] Data: {json.dumps(template_data, indent=2)}") return {"success": True, "message_id": "dev-mode", "dev_mode": True} try: client = get_ses_client() response = client.send_email( FromEmailAddress=EMAIL_FROM, Destination={"ToAddresses": [to]}, Content={ "Template": { "TemplateName": template_name, "TemplateData": json.dumps(template_data) } } ) message_id = response.get("MessageId", "unknown") print(f"[Email] Sent {template_name} to {to}, MessageId: {message_id}") return {"success": True, "message_id": message_id} except ClientError as e: error_msg = e.response.get('Error', {}).get('Message', str(e)) print(f"[Email] Failed to send {template_name} to {to}: {error_msg}") return {"success": False, "error": error_msg} # ============================================================================= # Email Functions # ============================================================================= async def send_verification_email(email: str, token: str, user_name: Optional[str] = None) -> bool: """Send email verification using SES template""" verification_link = f"{FRONTEND_URL}/verify-email?token={token}" template_data = { "user_name": user_name or email.split("@")[0], "verification_link": verification_link, "app_name": APP_NAME, "support_email": SUPPORT_EMAIL } result = send_templated_email(email, TEMPLATE_VERIFICATION, template_data) return result["success"] async def send_password_reset_email(email: str, token: str, user_name: Optional[str] = None) -> bool: """Send password reset email using SES template""" reset_link = f"{FRONTEND_URL}/reset-password?token={token}" template_data = { "user_name": user_name or email.split("@")[0], "reset_link": reset_link, "app_name": APP_NAME, "support_email": SUPPORT_EMAIL } result = send_templated_email(email, TEMPLATE_PASSWORD_RESET, template_data) return result["success"] async def send_security_alert_email( email: str, security_event_description: str, user_name: Optional[str] = None ) -> bool: """Send security alert email using SES template""" template_data = { "user_name": user_name or email.split("@")[0], "security_event_description": security_event_description, "app_name": APP_NAME, "support_email": SUPPORT_EMAIL } result = send_templated_email(email, TEMPLATE_SECURITY_ALERT, template_data) return result["success"] # ============================================================================= # Token Generation & Expiry Helpers # ============================================================================= def generate_token() -> str: """Generate a secure random token""" return secrets.token_urlsafe(32) def get_verification_expiry() -> datetime: """24 hour expiry for email verification""" return datetime.utcnow() + timedelta(hours=24) def get_reset_expiry() -> datetime: """1 hour expiry for password reset""" return datetime.utcnow() + timedelta(hours=1)