Some checks failed
Deploy Fediversion / deploy (push) Failing after 1s
- Group model: Add vertical_id scoping, image_url - Vertical model: Add logo_url, accent_color for branding - Groups router: Add vertical filter, member count, leave endpoint - Fix CI/CD deploy.yml git clone URL (runfoo-org)
185 lines
5.5 KiB
Python
185 lines
5.5 KiB
Python
from typing import List, Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlmodel import Session, select, func
|
|
from database import get_session
|
|
from models import Group, GroupMember, GroupPost, User
|
|
from schemas import GroupCreate, GroupRead, GroupPostCreate, GroupPostRead
|
|
from auth import get_current_user
|
|
|
|
router = APIRouter(prefix="/groups", tags=["groups"])
|
|
|
|
# --- Groups ---
|
|
|
|
@router.post("/", response_model=GroupRead)
|
|
def create_group(
|
|
group: GroupCreate,
|
|
session: Session = Depends(get_session),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
db_group = Group.model_validate(group)
|
|
db_group.created_by = current_user.id
|
|
session.add(db_group)
|
|
session.commit()
|
|
session.refresh(db_group)
|
|
|
|
# Auto-join creator as admin
|
|
member = GroupMember(group_id=db_group.id, user_id=current_user.id, role="admin")
|
|
session.add(member)
|
|
session.commit()
|
|
|
|
return db_group
|
|
|
|
@router.get("/", response_model=List[GroupRead])
|
|
def read_groups(
|
|
offset: int = 0,
|
|
limit: int = Query(default=100, le=100),
|
|
vertical: Optional[str] = None,
|
|
session: Session = Depends(get_session)
|
|
):
|
|
from models import Vertical
|
|
|
|
query = select(Group)
|
|
|
|
# Filter by vertical if specified
|
|
if vertical:
|
|
v = session.exec(select(Vertical).where(Vertical.slug == vertical)).first()
|
|
if v:
|
|
query = query.where(Group.vertical_id == v.id)
|
|
|
|
groups = session.exec(query.offset(offset).limit(limit)).all()
|
|
|
|
# Add member count to each group
|
|
result = []
|
|
for g in groups:
|
|
member_count = session.exec(
|
|
select(func.count(GroupMember.id)).where(GroupMember.group_id == g.id)
|
|
).one()
|
|
result.append({
|
|
**g.model_dump(),
|
|
"member_count": member_count or 0
|
|
})
|
|
return result
|
|
|
|
@router.get("/{group_id}", response_model=GroupRead)
|
|
def read_group(group_id: int, session: Session = Depends(get_session)):
|
|
group = session.get(Group, group_id)
|
|
if not group:
|
|
raise HTTPException(status_code=404, detail="Group not found")
|
|
return group
|
|
|
|
@router.post("/{group_id}/join")
|
|
def join_group(
|
|
group_id: int,
|
|
session: Session = Depends(get_session),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
group = session.get(Group, group_id)
|
|
if not group:
|
|
raise HTTPException(status_code=404, detail="Group not found")
|
|
|
|
# Check if already member
|
|
existing = session.exec(
|
|
select(GroupMember)
|
|
.where(GroupMember.group_id == group_id)
|
|
.where(GroupMember.user_id == current_user.id)
|
|
).first()
|
|
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Already a member")
|
|
|
|
member = GroupMember(group_id=group_id, user_id=current_user.id)
|
|
session.add(member)
|
|
session.commit()
|
|
|
|
# Notify Group Owner
|
|
from routers.notifications import create_notification
|
|
create_notification(
|
|
session,
|
|
user_id=group.created_by,
|
|
type="group_join",
|
|
title="New Group Member",
|
|
message=f"{current_user.email} joined {group.name}",
|
|
link=f"/groups/{group.id}"
|
|
)
|
|
|
|
return {"status": "joined"}
|
|
|
|
|
|
@router.delete("/{group_id}/leave")
|
|
def leave_group(
|
|
group_id: int,
|
|
session: Session = Depends(get_session),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Leave a group (non-admins only, admins must transfer ownership first)"""
|
|
member = session.exec(
|
|
select(GroupMember)
|
|
.where(GroupMember.group_id == group_id)
|
|
.where(GroupMember.user_id == current_user.id)
|
|
).first()
|
|
|
|
if not member:
|
|
raise HTTPException(status_code=404, detail="Not a member")
|
|
|
|
if member.role == "admin":
|
|
# Check if only admin
|
|
other_admins = session.exec(
|
|
select(GroupMember)
|
|
.where(GroupMember.group_id == group_id)
|
|
.where(GroupMember.role == "admin")
|
|
.where(GroupMember.user_id != current_user.id)
|
|
).first()
|
|
|
|
if not other_admins:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot leave: you are the only admin. Transfer ownership first."
|
|
)
|
|
|
|
session.delete(member)
|
|
session.commit()
|
|
return {"status": "left"}
|
|
|
|
# --- Posts ---
|
|
|
|
@router.post("/{group_id}/posts", response_model=GroupPostRead)
|
|
def create_post(
|
|
group_id: int,
|
|
post: GroupPostCreate,
|
|
session: Session = Depends(get_session),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
# Verify membership
|
|
member = session.exec(
|
|
select(GroupMember)
|
|
.where(GroupMember.group_id == group_id)
|
|
.where(GroupMember.user_id == current_user.id)
|
|
).first()
|
|
|
|
if not member:
|
|
raise HTTPException(status_code=403, detail="Must be a member to post")
|
|
|
|
db_post = GroupPost.model_validate(post)
|
|
db_post.group_id = group_id
|
|
db_post.user_id = current_user.id
|
|
session.add(db_post)
|
|
session.commit()
|
|
session.refresh(db_post)
|
|
return db_post
|
|
|
|
@router.get("/{group_id}/posts", response_model=List[GroupPostRead])
|
|
def read_posts(
|
|
group_id: int,
|
|
offset: int = 0,
|
|
limit: int = Query(default=50, le=100),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
# Check if group is private? For now assume public read or check membership if private.
|
|
posts = session.exec(
|
|
select(GroupPost)
|
|
.where(GroupPost.group_id == group_id)
|
|
.order_by(GroupPost.created_at.desc())
|
|
.offset(offset)
|
|
.limit(limit)
|
|
).all()
|
|
return posts
|