elmeg-demo/backend/routers/groups.py

128 lines
3.8 KiB
Python

from typing import List
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import Session, select, func
from database import get_session
from models import Group, GroupMember, GroupPost, User
from schemas import GroupCreate, GroupRead, GroupPostCreate, GroupPostRead
from auth import get_current_user
router = APIRouter(prefix="/groups", tags=["groups"])
# --- Groups ---
@router.post("/", response_model=GroupRead)
def create_group(
group: GroupCreate,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
):
db_group = Group.model_validate(group)
db_group.created_by = current_user.id
session.add(db_group)
session.commit()
session.refresh(db_group)
# Auto-join creator as admin
member = GroupMember(group_id=db_group.id, user_id=current_user.id, role="admin")
session.add(member)
session.commit()
return db_group
@router.get("/", response_model=List[GroupRead])
def read_groups(
offset: int = 0,
limit: int = Query(default=100, le=100),
session: Session = Depends(get_session)
):
# TODO: Add member count to response
groups = session.exec(select(Group).offset(offset).limit(limit)).all()
return groups
@router.get("/{group_id}", response_model=GroupRead)
def read_group(group_id: int, session: Session = Depends(get_session)):
group = session.get(Group, group_id)
if not group:
raise HTTPException(status_code=404, detail="Group not found")
return group
@router.post("/{group_id}/join")
def join_group(
group_id: int,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
):
group = session.get(Group, group_id)
if not group:
raise HTTPException(status_code=404, detail="Group not found")
# Check if already member
existing = session.exec(
select(GroupMember)
.where(GroupMember.group_id == group_id)
.where(GroupMember.user_id == current_user.id)
).first()
if existing:
raise HTTPException(status_code=400, detail="Already a member")
member = GroupMember(group_id=group_id, user_id=current_user.id)
session.add(member)
session.commit()
# Notify Group Owner
from routers.notifications import create_notification
create_notification(
session,
user_id=group.created_by,
type="group_join",
title="New Group Member",
message=f"{current_user.email} joined {group.name}",
link=f"/groups/{group.id}"
)
return {"status": "joined"}
# --- Posts ---
@router.post("/{group_id}/posts", response_model=GroupPostRead)
def create_post(
group_id: int,
post: GroupPostCreate,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
):
# Verify membership
member = session.exec(
select(GroupMember)
.where(GroupMember.group_id == group_id)
.where(GroupMember.user_id == current_user.id)
).first()
if not member:
raise HTTPException(status_code=403, detail="Must be a member to post")
db_post = GroupPost.model_validate(post)
db_post.group_id = group_id
db_post.user_id = current_user.id
session.add(db_post)
session.commit()
session.refresh(db_post)
return db_post
@router.get("/{group_id}/posts", response_model=List[GroupPostRead])
def read_posts(
group_id: int,
offset: int = 0,
limit: int = Query(default=50, le=100),
session: Session = Depends(get_session)
):
# Check if group is private? For now assume public read or check membership if private.
posts = session.exec(
select(GroupPost)
.where(GroupPost.group_id == group_id)
.order_by(GroupPost.created_at.desc())
.offset(offset)
.limit(limit)
).all()
return posts