from typing import List from fastapi import APIRouter, Depends, HTTPException, Query from sqlmodel import Session, select, func from database import get_session from models import Group, GroupMember, GroupPost, User from schemas import GroupCreate, GroupRead, GroupPostCreate, GroupPostRead from auth import get_current_user router = APIRouter(prefix="/groups", tags=["groups"]) # --- Groups --- @router.post("/", response_model=GroupRead) def create_group( group: GroupCreate, session: Session = Depends(get_session), current_user: User = Depends(get_current_user) ): db_group = Group.model_validate(group) db_group.created_by = current_user.id session.add(db_group) session.commit() session.refresh(db_group) # Auto-join creator as admin member = GroupMember(group_id=db_group.id, user_id=current_user.id, role="admin") session.add(member) session.commit() return db_group @router.get("/", response_model=List[GroupRead]) def read_groups( offset: int = 0, limit: int = Query(default=100, le=100), session: Session = Depends(get_session) ): # TODO: Add member count to response groups = session.exec(select(Group).offset(offset).limit(limit)).all() return groups @router.get("/{group_id}", response_model=GroupRead) def read_group(group_id: int, session: Session = Depends(get_session)): group = session.get(Group, group_id) if not group: raise HTTPException(status_code=404, detail="Group not found") return group @router.post("/{group_id}/join") def join_group( group_id: int, session: Session = Depends(get_session), current_user: User = Depends(get_current_user) ): group = session.get(Group, group_id) if not group: raise HTTPException(status_code=404, detail="Group not found") # Check if already member existing = session.exec( select(GroupMember) .where(GroupMember.group_id == group_id) .where(GroupMember.user_id == current_user.id) ).first() if existing: raise HTTPException(status_code=400, detail="Already a member") member = GroupMember(group_id=group_id, user_id=current_user.id) session.add(member) session.commit() # Notify Group Owner from routers.notifications import create_notification create_notification( session, user_id=group.created_by, type="group_join", title="New Group Member", message=f"{current_user.email} joined {group.name}", link=f"/groups/{group.id}" ) return {"status": "joined"} # --- Posts --- @router.post("/{group_id}/posts", response_model=GroupPostRead) def create_post( group_id: int, post: GroupPostCreate, session: Session = Depends(get_session), current_user: User = Depends(get_current_user) ): # Verify membership member = session.exec( select(GroupMember) .where(GroupMember.group_id == group_id) .where(GroupMember.user_id == current_user.id) ).first() if not member: raise HTTPException(status_code=403, detail="Must be a member to post") db_post = GroupPost.model_validate(post) db_post.group_id = group_id db_post.user_id = current_user.id session.add(db_post) session.commit() session.refresh(db_post) return db_post @router.get("/{group_id}/posts", response_model=List[GroupPostRead]) def read_posts( group_id: int, offset: int = 0, limit: int = Query(default=50, le=100), session: Session = Depends(get_session) ): # Check if group is private? For now assume public read or check membership if private. posts = session.exec( select(GroupPost) .where(GroupPost.group_id == group_id) .order_by(GroupPost.created_at.desc()) .offset(offset) .limit(limit) ).all() return posts