- Fork elmeg-demo codebase for multi-band support - Add data importer infrastructure with base class - Create band-specific importers: - phish.py: Phish.net API v5 - grateful_dead.py: Grateful Stats API - setlistfm.py: Dead & Company, Billy Strings (Setlist.fm) - Add spec-kit configuration for Gemini - Update README with supported bands and architecture
77 lines
2.3 KiB
Python
77 lines
2.3 KiB
Python
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlmodel import Session, select
|
|
from typing import List
|
|
from database import get_session
|
|
from models import Notification, User
|
|
from auth import oauth2_scheme, get_current_user
|
|
|
|
router = APIRouter(
|
|
prefix="/notifications",
|
|
tags=["notifications"]
|
|
)
|
|
|
|
@router.get("/", response_model=List[Notification])
|
|
def get_my_notifications(
|
|
skip: int = 0,
|
|
limit: int = 20,
|
|
current_user: User = Depends(get_current_user),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
statement = (
|
|
select(Notification)
|
|
.where(Notification.user_id == current_user.id)
|
|
.order_by(Notification.created_at.desc())
|
|
.offset(skip)
|
|
.limit(limit)
|
|
)
|
|
return session.exec(statement).all()
|
|
|
|
@router.get("/unread-count")
|
|
def get_unread_count(
|
|
current_user: User = Depends(get_current_user),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
statement = (
|
|
select(Notification)
|
|
.where(Notification.user_id == current_user.id)
|
|
.where(Notification.is_read == False)
|
|
)
|
|
results = session.exec(statement).all()
|
|
return {"count": len(results)}
|
|
|
|
@router.post("/{notification_id}/read")
|
|
def mark_notification_read(
|
|
notification_id: int,
|
|
current_user: User = Depends(get_current_user),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
notification = session.get(Notification, notification_id)
|
|
if not notification:
|
|
raise HTTPException(status_code=404, detail="Notification not found")
|
|
if notification.user_id != current_user.id:
|
|
raise HTTPException(status_code=403, detail="Not authorized")
|
|
|
|
notification.is_read = True
|
|
session.add(notification)
|
|
session.commit()
|
|
session.refresh(notification)
|
|
return notification
|
|
|
|
@router.post("/mark-all-read")
|
|
def mark_all_read(
|
|
current_user: User = Depends(get_current_user),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
statement = (
|
|
select(Notification)
|
|
.where(Notification.user_id == current_user.id)
|
|
.where(Notification.is_read == False)
|
|
)
|
|
notifications = session.exec(statement).all()
|
|
|
|
for notification in notifications:
|
|
notification.is_read = True
|
|
session.add(notification)
|
|
|
|
session.commit()
|
|
return {"status": "success", "marked_count": len(notifications)}
|