fediversion/backend/routers/tickets.py
fullsizemalt b4cddf41ea feat: Initialize Fediversion multi-band platform
- Fork elmeg-demo codebase for multi-band support
- Add data importer infrastructure with base class
- Create band-specific importers:
  - phish.py: Phish.net API v5
  - grateful_dead.py: Grateful Stats API
  - setlistfm.py: Dead & Company, Billy Strings (Setlist.fm)
- Add spec-kit configuration for Gemini
- Update README with supported bands and architecture
2025-12-28 12:39:28 -08:00

371 lines
11 KiB
Python

"""
Bug Tracker API Routes - ISOLATED MODULE
To remove this feature:
1. Delete this file
2. Delete models_tickets.py
3. Remove router import from main.py
4. Drop 'ticket' and 'ticketcomment' tables from database
"""
from typing import List, Optional
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import Session, select, func
from database import get_session
# Import auth helpers but don't create hard dependency
try:
from auth import get_current_user, get_current_user_optional
from models import User
AUTH_AVAILABLE = True
except ImportError:
AUTH_AVAILABLE = False
User = None
def get_current_user():
return None
def get_current_user_optional():
return None
from models_tickets import (
Ticket, TicketComment, TicketType, TicketStatus, TicketPriority,
TicketCreate, TicketUpdate, TicketCommentCreate, TicketRead, TicketCommentRead
)
router = APIRouter(prefix="/tickets", tags=["tickets"])
def generate_ticket_number(session: Session) -> str:
"""Generate next ticket number like ELM-001"""
result = session.exec(select(func.count(Ticket.id))).one()
return f"ELM-{(result + 1):03d}"
# --- Public Endpoints ---
@router.post("/", response_model=TicketRead)
def create_ticket(
ticket: TicketCreate,
session: Session = Depends(get_session),
current_user = Depends(get_current_user_optional) if AUTH_AVAILABLE else None
):
"""Create a new support ticket"""
# Determine reporter info
if current_user:
reporter_email = current_user.email
reporter_name = current_user.email.split("@")[0]
reporter_user_id = current_user.id
elif ticket.reporter_email:
reporter_email = ticket.reporter_email
reporter_name = ticket.reporter_name or ticket.reporter_email.split("@")[0]
reporter_user_id = None
else:
raise HTTPException(status_code=400, detail="Email required for anonymous submissions")
db_ticket = Ticket(
ticket_number=generate_ticket_number(session),
type=ticket.type,
priority=ticket.priority,
title=ticket.title,
description=ticket.description,
reporter_email=reporter_email,
reporter_name=reporter_name,
reporter_user_id=reporter_user_id,
browser=ticket.browser,
os=ticket.os,
page_url=ticket.page_url,
)
session.add(db_ticket)
session.commit()
session.refresh(db_ticket)
return db_ticket
@router.get("/known-issues", response_model=List[TicketRead])
def get_known_issues(
session: Session = Depends(get_session),
offset: int = 0,
limit: int = Query(default=50, le=100)
):
"""Get public/known issues"""
tickets = session.exec(
select(Ticket)
.where(Ticket.is_public == True)
.where(Ticket.status.in_([TicketStatus.OPEN, TicketStatus.IN_PROGRESS]))
.order_by(Ticket.upvotes.desc(), Ticket.created_at.desc())
.offset(offset)
.limit(limit)
).all()
return tickets
@router.get("/my-tickets", response_model=List[TicketRead])
def get_my_tickets(
session: Session = Depends(get_session),
current_user = Depends(get_current_user) if AUTH_AVAILABLE else None
):
"""Get tickets submitted by current user"""
if not current_user:
raise HTTPException(status_code=401, detail="Login required")
tickets = session.exec(
select(Ticket)
.where(Ticket.reporter_user_id == current_user.id)
.order_by(Ticket.created_at.desc())
).all()
return tickets
@router.get("/{ticket_number}", response_model=TicketRead)
def get_ticket(
ticket_number: str,
session: Session = Depends(get_session),
current_user = Depends(get_current_user_optional) if AUTH_AVAILABLE else None
):
"""Get ticket by number"""
ticket = session.exec(
select(Ticket).where(Ticket.ticket_number == ticket_number)
).first()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
# Check visibility
if not ticket.is_public:
if not current_user:
raise HTTPException(status_code=403, detail="Login required")
is_admin = getattr(current_user, 'is_superuser', False)
if ticket.reporter_user_id != current_user.id and not is_admin:
raise HTTPException(status_code=403, detail="Access denied")
return ticket
@router.get("/{ticket_number}/comments", response_model=List[TicketCommentRead])
def get_ticket_comments(
ticket_number: str,
session: Session = Depends(get_session),
current_user = Depends(get_current_user_optional) if AUTH_AVAILABLE else None
):
"""Get comments for a ticket"""
ticket = session.exec(
select(Ticket).where(Ticket.ticket_number == ticket_number)
).first()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
# Filter internal comments for non-admins
query = select(TicketComment).where(TicketComment.ticket_id == ticket.id)
is_admin = current_user and getattr(current_user, 'is_superuser', False)
if not is_admin:
query = query.where(TicketComment.is_internal == False)
comments = session.exec(query.order_by(TicketComment.created_at)).all()
return comments
@router.post("/{ticket_number}/comments", response_model=TicketCommentRead)
def add_comment(
ticket_number: str,
comment: TicketCommentCreate,
session: Session = Depends(get_session),
current_user = Depends(get_current_user) if AUTH_AVAILABLE else None
):
"""Add a comment to a ticket"""
if not current_user:
raise HTTPException(status_code=401, detail="Login required")
ticket = session.exec(
select(Ticket).where(Ticket.ticket_number == ticket_number)
).first()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
# Check access
is_admin = getattr(current_user, 'is_superuser', False)
if ticket.reporter_user_id != current_user.id and not is_admin:
raise HTTPException(status_code=403, detail="Access denied")
db_comment = TicketComment(
ticket_id=ticket.id,
author_user_id=current_user.id,
author_email=current_user.email,
author_name=current_user.email.split("@")[0],
content=comment.content,
is_internal=False
)
session.add(db_comment)
# Update ticket timestamp
ticket.updated_at = datetime.utcnow()
session.add(ticket)
session.commit()
session.refresh(db_comment)
return db_comment
@router.post("/{ticket_number}/upvote")
def upvote_ticket(
ticket_number: str,
session: Session = Depends(get_session)
):
"""Upvote a public ticket"""
ticket = session.exec(
select(Ticket).where(Ticket.ticket_number == ticket_number)
).first()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
if not ticket.is_public:
raise HTTPException(status_code=403, detail="Not a public ticket")
ticket.upvotes += 1
session.add(ticket)
session.commit()
return {"upvotes": ticket.upvotes}
# --- Admin Endpoints ---
@router.get("/admin/all", response_model=List[TicketRead])
def admin_get_all_tickets(
session: Session = Depends(get_session),
current_user = Depends(get_current_user) if AUTH_AVAILABLE else None,
status: Optional[TicketStatus] = None,
type: Optional[TicketType] = None,
priority: Optional[TicketPriority] = None,
offset: int = 0,
limit: int = Query(default=50, le=100)
):
"""Admin: Get all tickets with filters"""
if not current_user or not getattr(current_user, 'is_superuser', False):
raise HTTPException(status_code=403, detail="Admin access required")
query = select(Ticket)
if status:
query = query.where(Ticket.status == status)
if type:
query = query.where(Ticket.type == type)
if priority:
query = query.where(Ticket.priority == priority)
tickets = session.exec(
query.order_by(Ticket.created_at.desc())
.offset(offset)
.limit(limit)
).all()
return tickets
@router.patch("/admin/{ticket_number}", response_model=TicketRead)
def admin_update_ticket(
ticket_number: str,
update: TicketUpdate,
session: Session = Depends(get_session),
current_user = Depends(get_current_user) if AUTH_AVAILABLE else None
):
"""Admin: Update ticket status, priority, assignment"""
if not current_user or not getattr(current_user, 'is_superuser', False):
raise HTTPException(status_code=403, detail="Admin access required")
ticket = session.exec(
select(Ticket).where(Ticket.ticket_number == ticket_number)
).first()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
if update.status is not None:
ticket.status = update.status
if update.status == TicketStatus.RESOLVED:
ticket.resolved_at = datetime.utcnow()
if update.priority is not None:
ticket.priority = update.priority
if update.assigned_to_email is not None:
ticket.assigned_to_email = update.assigned_to_email
ticket.assigned_to_name = update.assigned_to_name
if update.is_public is not None:
ticket.is_public = update.is_public
ticket.updated_at = datetime.utcnow()
session.add(ticket)
session.commit()
session.refresh(ticket)
return ticket
@router.post("/admin/{ticket_number}/internal-note", response_model=TicketCommentRead)
def admin_add_internal_note(
ticket_number: str,
comment: TicketCommentCreate,
session: Session = Depends(get_session),
current_user = Depends(get_current_user) if AUTH_AVAILABLE else None
):
"""Admin: Add internal note (not visible to user)"""
if not current_user or not getattr(current_user, 'is_superuser', False):
raise HTTPException(status_code=403, detail="Admin access required")
ticket = session.exec(
select(Ticket).where(Ticket.ticket_number == ticket_number)
).first()
if not ticket:
raise HTTPException(status_code=404, detail="Ticket not found")
db_comment = TicketComment(
ticket_id=ticket.id,
author_user_id=current_user.id,
author_email=current_user.email,
author_name=current_user.email.split("@")[0],
content=comment.content,
is_internal=True
)
session.add(db_comment)
session.commit()
session.refresh(db_comment)
return db_comment
@router.get("/admin/stats")
def admin_get_stats(
session: Session = Depends(get_session),
current_user = Depends(get_current_user) if AUTH_AVAILABLE else None
):
"""Admin: Get ticket statistics"""
if not current_user or not getattr(current_user, 'is_superuser', False):
raise HTTPException(status_code=403, detail="Admin access required")
total = session.exec(select(func.count(Ticket.id))).one()
open_count = session.exec(
select(func.count(Ticket.id)).where(Ticket.status == TicketStatus.OPEN)
).one()
in_progress = session.exec(
select(func.count(Ticket.id)).where(Ticket.status == TicketStatus.IN_PROGRESS)
).one()
resolved = session.exec(
select(func.count(Ticket.id)).where(Ticket.status == TicketStatus.RESOLVED)
).one()
return {
"total": total,
"open": open_count,
"in_progress": in_progress,
"resolved": resolved
}