fediversion/backend/routers/notifications.py
fullsizemalt b4cddf41ea feat: Initialize Fediversion multi-band platform
- Fork elmeg-demo codebase for multi-band support
- Add data importer infrastructure with base class
- Create band-specific importers:
  - phish.py: Phish.net API v5
  - grateful_dead.py: Grateful Stats API
  - setlistfm.py: Dead & Company, Billy Strings (Setlist.fm)
- Add spec-kit configuration for Gemini
- Update README with supported bands and architecture
2025-12-28 12:39:28 -08:00

77 lines
2.3 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlmodel import Session, select
from typing import List
from database import get_session
from models import Notification, User
from auth import oauth2_scheme, get_current_user
router = APIRouter(
prefix="/notifications",
tags=["notifications"]
)
@router.get("/", response_model=List[Notification])
def get_my_notifications(
skip: int = 0,
limit: int = 20,
current_user: User = Depends(get_current_user),
session: Session = Depends(get_session)
):
statement = (
select(Notification)
.where(Notification.user_id == current_user.id)
.order_by(Notification.created_at.desc())
.offset(skip)
.limit(limit)
)
return session.exec(statement).all()
@router.get("/unread-count")
def get_unread_count(
current_user: User = Depends(get_current_user),
session: Session = Depends(get_session)
):
statement = (
select(Notification)
.where(Notification.user_id == current_user.id)
.where(Notification.is_read == False)
)
results = session.exec(statement).all()
return {"count": len(results)}
@router.post("/{notification_id}/read")
def mark_notification_read(
notification_id: int,
current_user: User = Depends(get_current_user),
session: Session = Depends(get_session)
):
notification = session.get(Notification, notification_id)
if not notification:
raise HTTPException(status_code=404, detail="Notification not found")
if notification.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized")
notification.is_read = True
session.add(notification)
session.commit()
session.refresh(notification)
return notification
@router.post("/mark-all-read")
def mark_all_read(
current_user: User = Depends(get_current_user),
session: Session = Depends(get_session)
):
statement = (
select(Notification)
.where(Notification.user_id == current_user.id)
.where(Notification.is_read == False)
)
notifications = session.exec(statement).all()
for notification in notifications:
notification.is_read = True
session.add(notification)
session.commit()
return {"status": "success", "marked_count": len(notifications)}