""" Bug Tracker API Routes - ISOLATED MODULE To remove this feature: 1. Delete this file 2. Delete models_tickets.py 3. Remove router import from main.py 4. Drop 'ticket' and 'ticketcomment' tables from database """ from typing import List, Optional from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, Query from sqlmodel import Session, select, func from database import get_session # Import auth helpers but don't create hard dependency try: from auth import get_current_user, get_current_user_optional from models import User AUTH_AVAILABLE = True except ImportError: AUTH_AVAILABLE = False User = None def get_current_user(): return None def get_current_user_optional(): return None from models_tickets import ( Ticket, TicketComment, TicketType, TicketStatus, TicketPriority, TicketCreate, TicketUpdate, TicketCommentCreate, TicketRead, TicketCommentRead ) router = APIRouter(prefix="/tickets", tags=["tickets"]) def generate_ticket_number(session: Session) -> str: """Generate next ticket number like ELM-001""" result = session.exec(select(func.count(Ticket.id))).one() return f"ELM-{(result + 1):03d}" # --- Public Endpoints --- @router.post("/", response_model=TicketRead) def create_ticket( ticket: TicketCreate, session: Session = Depends(get_session), current_user = Depends(get_current_user_optional) if AUTH_AVAILABLE else None ): """Create a new support ticket""" # Determine reporter info if current_user: reporter_email = current_user.email reporter_name = current_user.email.split("@")[0] reporter_user_id = current_user.id elif ticket.reporter_email: reporter_email = ticket.reporter_email reporter_name = ticket.reporter_name or ticket.reporter_email.split("@")[0] reporter_user_id = None else: raise HTTPException(status_code=400, detail="Email required for anonymous submissions") db_ticket = Ticket( ticket_number=generate_ticket_number(session), type=ticket.type, priority=ticket.priority, title=ticket.title, description=ticket.description, reporter_email=reporter_email, reporter_name=reporter_name, reporter_user_id=reporter_user_id, browser=ticket.browser, os=ticket.os, page_url=ticket.page_url, ) session.add(db_ticket) session.commit() session.refresh(db_ticket) return db_ticket @router.get("/known-issues", response_model=List[TicketRead]) def get_known_issues( session: Session = Depends(get_session), offset: int = 0, limit: int = Query(default=50, le=100) ): """Get public/known issues""" tickets = session.exec( select(Ticket) .where(Ticket.is_public == True) .where(Ticket.status.in_([TicketStatus.OPEN, TicketStatus.IN_PROGRESS])) .order_by(Ticket.upvotes.desc(), Ticket.created_at.desc()) .offset(offset) .limit(limit) ).all() return tickets @router.get("/my-tickets", response_model=List[TicketRead]) def get_my_tickets( session: Session = Depends(get_session), current_user = Depends(get_current_user) if AUTH_AVAILABLE else None ): """Get tickets submitted by current user""" if not current_user: raise HTTPException(status_code=401, detail="Login required") tickets = session.exec( select(Ticket) .where(Ticket.reporter_user_id == current_user.id) .order_by(Ticket.created_at.desc()) ).all() return tickets @router.get("/{ticket_number}", response_model=TicketRead) def get_ticket( ticket_number: str, session: Session = Depends(get_session), current_user = Depends(get_current_user_optional) if AUTH_AVAILABLE else None ): """Get ticket by number""" ticket = session.exec( select(Ticket).where(Ticket.ticket_number == ticket_number) ).first() if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") # Check visibility if not ticket.is_public: if not current_user: raise HTTPException(status_code=403, detail="Login required") is_admin = getattr(current_user, 'is_superuser', False) if ticket.reporter_user_id != current_user.id and not is_admin: raise HTTPException(status_code=403, detail="Access denied") return ticket @router.get("/{ticket_number}/comments", response_model=List[TicketCommentRead]) def get_ticket_comments( ticket_number: str, session: Session = Depends(get_session), current_user = Depends(get_current_user_optional) if AUTH_AVAILABLE else None ): """Get comments for a ticket""" ticket = session.exec( select(Ticket).where(Ticket.ticket_number == ticket_number) ).first() if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") # Filter internal comments for non-admins query = select(TicketComment).where(TicketComment.ticket_id == ticket.id) is_admin = current_user and getattr(current_user, 'is_superuser', False) if not is_admin: query = query.where(TicketComment.is_internal == False) comments = session.exec(query.order_by(TicketComment.created_at)).all() return comments @router.post("/{ticket_number}/comments", response_model=TicketCommentRead) def add_comment( ticket_number: str, comment: TicketCommentCreate, session: Session = Depends(get_session), current_user = Depends(get_current_user) if AUTH_AVAILABLE else None ): """Add a comment to a ticket""" if not current_user: raise HTTPException(status_code=401, detail="Login required") ticket = session.exec( select(Ticket).where(Ticket.ticket_number == ticket_number) ).first() if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") # Check access is_admin = getattr(current_user, 'is_superuser', False) if ticket.reporter_user_id != current_user.id and not is_admin: raise HTTPException(status_code=403, detail="Access denied") db_comment = TicketComment( ticket_id=ticket.id, author_user_id=current_user.id, author_email=current_user.email, author_name=current_user.email.split("@")[0], content=comment.content, is_internal=False ) session.add(db_comment) # Update ticket timestamp ticket.updated_at = datetime.utcnow() session.add(ticket) session.commit() session.refresh(db_comment) return db_comment @router.post("/{ticket_number}/upvote") def upvote_ticket( ticket_number: str, session: Session = Depends(get_session) ): """Upvote a public ticket""" ticket = session.exec( select(Ticket).where(Ticket.ticket_number == ticket_number) ).first() if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") if not ticket.is_public: raise HTTPException(status_code=403, detail="Not a public ticket") ticket.upvotes += 1 session.add(ticket) session.commit() return {"upvotes": ticket.upvotes} # --- Admin Endpoints --- @router.get("/admin/all", response_model=List[TicketRead]) def admin_get_all_tickets( session: Session = Depends(get_session), current_user = Depends(get_current_user) if AUTH_AVAILABLE else None, status: Optional[TicketStatus] = None, type: Optional[TicketType] = None, priority: Optional[TicketPriority] = None, offset: int = 0, limit: int = Query(default=50, le=100) ): """Admin: Get all tickets with filters""" if not current_user or not getattr(current_user, 'is_superuser', False): raise HTTPException(status_code=403, detail="Admin access required") query = select(Ticket) if status: query = query.where(Ticket.status == status) if type: query = query.where(Ticket.type == type) if priority: query = query.where(Ticket.priority == priority) tickets = session.exec( query.order_by(Ticket.created_at.desc()) .offset(offset) .limit(limit) ).all() return tickets @router.patch("/admin/{ticket_number}", response_model=TicketRead) def admin_update_ticket( ticket_number: str, update: TicketUpdate, session: Session = Depends(get_session), current_user = Depends(get_current_user) if AUTH_AVAILABLE else None ): """Admin: Update ticket status, priority, assignment""" if not current_user or not getattr(current_user, 'is_superuser', False): raise HTTPException(status_code=403, detail="Admin access required") ticket = session.exec( select(Ticket).where(Ticket.ticket_number == ticket_number) ).first() if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") if update.status is not None: ticket.status = update.status if update.status == TicketStatus.RESOLVED: ticket.resolved_at = datetime.utcnow() if update.priority is not None: ticket.priority = update.priority if update.assigned_to_email is not None: ticket.assigned_to_email = update.assigned_to_email ticket.assigned_to_name = update.assigned_to_name if update.is_public is not None: ticket.is_public = update.is_public ticket.updated_at = datetime.utcnow() session.add(ticket) session.commit() session.refresh(ticket) return ticket @router.post("/admin/{ticket_number}/internal-note", response_model=TicketCommentRead) def admin_add_internal_note( ticket_number: str, comment: TicketCommentCreate, session: Session = Depends(get_session), current_user = Depends(get_current_user) if AUTH_AVAILABLE else None ): """Admin: Add internal note (not visible to user)""" if not current_user or not getattr(current_user, 'is_superuser', False): raise HTTPException(status_code=403, detail="Admin access required") ticket = session.exec( select(Ticket).where(Ticket.ticket_number == ticket_number) ).first() if not ticket: raise HTTPException(status_code=404, detail="Ticket not found") db_comment = TicketComment( ticket_id=ticket.id, author_user_id=current_user.id, author_email=current_user.email, author_name=current_user.email.split("@")[0], content=comment.content, is_internal=True ) session.add(db_comment) session.commit() session.refresh(db_comment) return db_comment @router.get("/admin/stats") def admin_get_stats( session: Session = Depends(get_session), current_user = Depends(get_current_user) if AUTH_AVAILABLE else None ): """Admin: Get ticket statistics""" if not current_user or not getattr(current_user, 'is_superuser', False): raise HTTPException(status_code=403, detail="Admin access required") total = session.exec(select(func.count(Ticket.id))).one() open_count = session.exec( select(func.count(Ticket.id)).where(Ticket.status == TicketStatus.OPEN) ).one() in_progress = session.exec( select(func.count(Ticket.id)).where(Ticket.status == TicketStatus.IN_PROGRESS) ).one() resolved = session.exec( select(func.count(Ticket.id)).where(Ticket.status == TicketStatus.RESOLVED) ).one() return { "total": total, "open": open_count, "in_progress": in_progress, "resolved": resolved }