fediversion/backend/routers/groups.py
fullsizemalt c8e5a48d57
Some checks failed
Deploy Fediversion / deploy (push) Failing after 1s
feat: Groups refinement and band theming
- Group model: Add vertical_id scoping, image_url
- Vertical model: Add logo_url, accent_color for branding
- Groups router: Add vertical filter, member count, leave endpoint
- Fix CI/CD deploy.yml git clone URL (runfoo-org)
2025-12-28 16:57:41 -08:00

185 lines
5.5 KiB
Python

from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import Session, select, func
from database import get_session
from models import Group, GroupMember, GroupPost, User
from schemas import GroupCreate, GroupRead, GroupPostCreate, GroupPostRead
from auth import get_current_user
router = APIRouter(prefix="/groups", tags=["groups"])
# --- Groups ---
@router.post("/", response_model=GroupRead)
def create_group(
group: GroupCreate,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
):
db_group = Group.model_validate(group)
db_group.created_by = current_user.id
session.add(db_group)
session.commit()
session.refresh(db_group)
# Auto-join creator as admin
member = GroupMember(group_id=db_group.id, user_id=current_user.id, role="admin")
session.add(member)
session.commit()
return db_group
@router.get("/", response_model=List[GroupRead])
def read_groups(
offset: int = 0,
limit: int = Query(default=100, le=100),
vertical: Optional[str] = None,
session: Session = Depends(get_session)
):
from models import Vertical
query = select(Group)
# Filter by vertical if specified
if vertical:
v = session.exec(select(Vertical).where(Vertical.slug == vertical)).first()
if v:
query = query.where(Group.vertical_id == v.id)
groups = session.exec(query.offset(offset).limit(limit)).all()
# Add member count to each group
result = []
for g in groups:
member_count = session.exec(
select(func.count(GroupMember.id)).where(GroupMember.group_id == g.id)
).one()
result.append({
**g.model_dump(),
"member_count": member_count or 0
})
return result
@router.get("/{group_id}", response_model=GroupRead)
def read_group(group_id: int, session: Session = Depends(get_session)):
group = session.get(Group, group_id)
if not group:
raise HTTPException(status_code=404, detail="Group not found")
return group
@router.post("/{group_id}/join")
def join_group(
group_id: int,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
):
group = session.get(Group, group_id)
if not group:
raise HTTPException(status_code=404, detail="Group not found")
# Check if already member
existing = session.exec(
select(GroupMember)
.where(GroupMember.group_id == group_id)
.where(GroupMember.user_id == current_user.id)
).first()
if existing:
raise HTTPException(status_code=400, detail="Already a member")
member = GroupMember(group_id=group_id, user_id=current_user.id)
session.add(member)
session.commit()
# Notify Group Owner
from routers.notifications import create_notification
create_notification(
session,
user_id=group.created_by,
type="group_join",
title="New Group Member",
message=f"{current_user.email} joined {group.name}",
link=f"/groups/{group.id}"
)
return {"status": "joined"}
@router.delete("/{group_id}/leave")
def leave_group(
group_id: int,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
):
"""Leave a group (non-admins only, admins must transfer ownership first)"""
member = session.exec(
select(GroupMember)
.where(GroupMember.group_id == group_id)
.where(GroupMember.user_id == current_user.id)
).first()
if not member:
raise HTTPException(status_code=404, detail="Not a member")
if member.role == "admin":
# Check if only admin
other_admins = session.exec(
select(GroupMember)
.where(GroupMember.group_id == group_id)
.where(GroupMember.role == "admin")
.where(GroupMember.user_id != current_user.id)
).first()
if not other_admins:
raise HTTPException(
status_code=400,
detail="Cannot leave: you are the only admin. Transfer ownership first."
)
session.delete(member)
session.commit()
return {"status": "left"}
# --- Posts ---
@router.post("/{group_id}/posts", response_model=GroupPostRead)
def create_post(
group_id: int,
post: GroupPostCreate,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
):
# Verify membership
member = session.exec(
select(GroupMember)
.where(GroupMember.group_id == group_id)
.where(GroupMember.user_id == current_user.id)
).first()
if not member:
raise HTTPException(status_code=403, detail="Must be a member to post")
db_post = GroupPost.model_validate(post)
db_post.group_id = group_id
db_post.user_id = current_user.id
session.add(db_post)
session.commit()
session.refresh(db_post)
return db_post
@router.get("/{group_id}/posts", response_model=List[GroupPostRead])
def read_posts(
group_id: int,
offset: int = 0,
limit: int = Query(default=50, le=100),
session: Session = Depends(get_session)
):
# Check if group is private? For now assume public read or check membership if private.
posts = session.exec(
select(GroupPost)
.where(GroupPost.group_id == group_id)
.order_by(GroupPost.created_at.desc())
.offset(offset)
.limit(limit)
).all()
return posts