第4章 - CRUD 接口
🎯 本章目标
- 实现班级的 CRUD 接口
- 实现学生的 CRUD 接口
- 实现成绩的 CRUD 接口
- 学会使用路由分组
1️⃣ 班级 CRUD 操作
app/crud/class_crud.py
"""
班级 CRUD 操作
"""
from typing import List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import func
from app.models.class_model import Class
from app.models.student import Student
from app.schemas.class_schema import ClassCreate, ClassUpdate
def get_class(db: Session, class_id: int) -> Optional[Class]:
"""根据ID获取班级"""
return db.query(Class).filter(Class.id == class_id).first()
def get_class_by_name(db: Session, name: str, grade: str) -> Optional[Class]:
"""根据名称和年级获取班级"""
return db.query(Class).filter(
Class.name == name,
Class.grade == grade
).first()
def get_classes(
db: Session,
skip: int = 0,
limit: int = 10,
grade: str = None,
keyword: str = None
) -> tuple[List[Class], int]:
"""获取班级列表"""
query = db.query(Class)
# 年级筛选
if grade:
query = query.filter(Class.grade == grade)
# 关键词搜索
if keyword:
query = query.filter(
(Class.name.contains(keyword)) |
(Class.teacher.contains(keyword))
)
total = query.count()
items = query.order_by(Class.id.desc()).offset(skip).limit(limit).all()
return items, total
def get_class_with_student_count(db: Session, class_id: int):
"""获取班级及学生数量"""
class_obj = db.query(Class).filter(Class.id == class_id).first()
if class_obj:
student_count = db.query(Student).filter(Student.class_id == class_id).count()
return class_obj, student_count
return None, 0
def create_class(db: Session, class_data: ClassCreate) -> Class:
"""创建班级"""
db_class = Class(**class_data.model_dump())
db.add(db_class)
db.commit()
db.refresh(db_class)
return db_class
def update_class(db: Session, class_id: int, class_data: ClassUpdate) -> Optional[Class]:
"""更新班级"""
db_class = db.query(Class).filter(Class.id == class_id).first()
if db_class:
update_data = class_data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_class, key, value)
db.commit()
db.refresh(db_class)
return db_class
def delete_class(db: Session, class_id: int) -> bool:
"""删除班级"""
db_class = db.query(Class).filter(Class.id == class_id).first()
if db_class:
db.delete(db_class)
db.commit()
return True
return False
2️⃣ 学生 CRUD 操作
app/crud/student.py
"""
学生 CRUD 操作
"""
from typing import List, Optional
from sqlalchemy.orm import Session, joinedload
from app.models.student import Student
from app.models.class_model import Class
from app.schemas.student import StudentCreate, StudentUpdate
def get_student(db: Session, student_id: int) -> Optional[Student]:
"""根据ID获取学生"""
return db.query(Student).options(
joinedload(Student.class_info)
).filter(Student.id == student_id).first()
def get_student_by_no(db: Session, student_no: str) -> Optional[Student]:
"""根据学号获取学生"""
return db.query(Student).filter(Student.student_no == student_no).first()
def get_students(
db: Session,
skip: int = 0,
limit: int = 10,
class_id: int = None,
gender: str = None,
keyword: str = None
) -> tuple[List[Student], int]:
"""获取学生列表"""
query = db.query(Student).options(joinedload(Student.class_info))
# 班级筛选
if class_id:
query = query.filter(Student.class_id == class_id)
# 性别筛选
if gender:
query = query.filter(Student.gender == gender)
# 关键词搜索(姓名、学号)
if keyword:
query = query.filter(
(Student.name.contains(keyword)) |
(Student.student_no.contains(keyword))
)
total = query.count()
items = query.order_by(Student.id.desc()).offset(skip).limit(limit).all()
return items, total
def get_students_by_class(db: Session, class_id: int) -> List[Student]:
"""获取班级下的所有学生"""
return db.query(Student).filter(Student.class_id == class_id).all()
def create_student(db: Session, student_data: StudentCreate) -> Student:
"""创建学生"""
db_student = Student(**student_data.model_dump())
db.add(db_student)
db.commit()
db.refresh(db_student)
return db_student
def update_student(db: Session, student_id: int, student_data: StudentUpdate) -> Optional[Student]:
"""更新学生"""
db_student = db.query(Student).filter(Student.id == student_id).first()
if db_student:
update_data = student_data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_student, key, value)
db.commit()
db.refresh(db_student)
return db_student
def delete_student(db: Session, student_id: int) -> bool:
"""删除学生"""
db_student = db.query(Student).filter(Student.id == student_id).first()
if db_student:
db.delete(db_student)
db.commit()
return True
return False
def count_students(db: Session, class_id: int = None) -> int:
"""统计学生数量"""
query = db.query(Student)
if class_id:
query = query.filter(Student.class_id == class_id)
return query.count()
3️⃣ 成绩 CRUD 操作
app/crud/score.py
"""
成绩 CRUD 操作
"""
from typing import List, Optional
from datetime import date
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import func
from app.models.score import Score
from app.models.student import Student
from app.schemas.score import ScoreCreate, ScoreUpdate
def get_score(db: Session, score_id: int) -> Optional[Score]:
"""根据ID获取成绩"""
return db.query(Score).options(
joinedload(Score.student)
).filter(Score.id == score_id).first()
def get_scores(
db: Session,
skip: int = 0,
limit: int = 10,
student_id: int = None,
subject: str = None,
exam_type: str = None,
start_date: date = None,
end_date: date = None
) -> tuple[List[Score], int]:
"""获取成绩列表"""
query = db.query(Score).options(joinedload(Score.student))
if student_id:
query = query.filter(Score.student_id == student_id)
if subject:
query = query.filter(Score.subject == subject)
if exam_type:
query = query.filter(Score.exam_type == exam_type)
if start_date:
query = query.filter(Score.exam_date >= start_date)
if end_date:
query = query.filter(Score.exam_date <= end_date)
total = query.count()
items = query.order_by(Score.exam_date.desc(), Score.id.desc()).offset(skip).limit(limit).all()
return items, total
def get_student_scores(db: Session, student_id: int) -> List[Score]:
"""获取学生的所有成绩"""
return db.query(Score).filter(Score.student_id == student_id).order_by(Score.exam_date.desc()).all()
def create_score(db: Session, score_data: ScoreCreate) -> Score:
"""创建成绩"""
db_score = Score(**score_data.model_dump())
db.add(db_score)
db.commit()
db.refresh(db_score)
return db_score
def create_scores_batch(db: Session, student_id: int, scores: list, exam_date: date, exam_type: str) -> List[Score]:
"""批量创建成绩"""
created_scores = []
for score_item in scores:
db_score = Score(
student_id=student_id,
subject=score_item["subject"],
score=score_item["score"],
exam_date=exam_date,
exam_type=exam_type
)
db.add(db_score)
created_scores.append(db_score)
db.commit()
for score in created_scores:
db.refresh(score)
return created_scores
def update_score(db: Session, score_id: int, score_data: ScoreUpdate) -> Optional[Score]:
"""更新成绩"""
db_score = db.query(Score).filter(Score.id == score_id).first()
if db_score:
update_data = score_data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_score, key, value)
db.commit()
db.refresh(db_score)
return db_score
def delete_score(db: Session, score_id: int) -> bool:
"""删除成绩"""
db_score = db.query(Score).filter(Score.id == score_id).first()
if db_score:
db.delete(db_score)
db.commit()
return True
return False
def get_score_statistics(db: Session, class_id: int = None, subject: str = None):
"""获取成绩统计"""
query = db.query(
func.count(Score.id).label("count"),
func.avg(Score.score).label("avg"),
func.max(Score.score).label("max"),
func.min(Score.score).label("min")
)
if class_id:
query = query.join(Student).filter(Student.class_id == class_id)
if subject:
query = query.filter(Score.subject == subject)
result = query.first()
return {
"count": result.count or 0,
"avg": round(result.avg, 2) if result.avg else 0,
"max": result.max or 0,
"min": result.min or 0
}
4️⃣ CRUD 汇总
app/crud/init.py
"""
CRUD 操作汇总
"""
from app.crud import class_crud
from app.crud import student as student_crud
from app.crud import score as score_crud
__all__ = ["class_crud", "student_crud", "score_crud"]
5️⃣ 班级 API 路由
app/api/classes.py
"""
班级 API 路由
"""
from typing import List
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.database import get_db
from app.crud import class_crud
from app.schemas import ClassCreate, ClassUpdate, ClassOut, ClassWithStudentCount
from app.utils.response import success, error, page_response
router = APIRouter()
@router.post("/classes", summary="创建班级")
def create_class(class_data: ClassCreate, db: Session = Depends(get_db)):
"""创建新班级"""
# 检查是否已存在
existing = class_crud.get_class_by_name(db, class_data.name, class_data.grade)
if existing:
raise HTTPException(status_code=400, detail="该班级已存在")
db_class = class_crud.create_class(db, class_data)
return success(data=ClassOut.model_validate(db_class), message="创建成功")
@router.get("/classes", summary="获取班级列表")
def get_classes(
page: int = Query(default=1, ge=1, description="页码"),
size: int = Query(default=10, ge=1, le=100, description="每页数量"),
grade: str = Query(default=None, description="年级筛选"),
keyword: str = Query(default=None, description="搜索关键词"),
db: Session = Depends(get_db)
):
"""获取班级列表,支持分页和搜索"""
skip = (page - 1) * size
items, total = class_crud.get_classes(db, skip=skip, limit=size, grade=grade, keyword=keyword)
# 转换为响应模型
class_list = [ClassOut.model_validate(item) for item in items]
return page_response(class_list, total, page, size)
@router.get("/classes/{class_id}", summary="获取班级详情")
def get_class(class_id: int, db: Session = Depends(get_db)):
"""获取班级详情,包含学生数量"""
db_class, student_count = class_crud.get_class_with_student_count(db, class_id)
if not db_class:
raise HTTPException(status_code=404, detail="班级不存在")
result = ClassWithStudentCount.model_validate(db_class)
result.student_count = student_count
return success(data=result)
@router.put("/classes/{class_id}", summary="更新班级")
def update_class(class_id: int, class_data: ClassUpdate, db: Session = Depends(get_db)):
"""更新班级信息"""
db_class = class_crud.update_class(db, class_id, class_data)
if not db_class:
raise HTTPException(status_code=404, detail="班级不存在")
return success(data=ClassOut.model_validate(db_class), message="更新成功")
@router.delete("/classes/{class_id}", summary="删除班级")
def delete_class(class_id: int, db: Session = Depends(get_db)):
"""删除班级"""
if not class_crud.delete_class(db, class_id):
raise HTTPException(status_code=404, detail="班级不存在")
return success(message="删除成功")
6️⃣ 学生 API 路由
app/api/students.py
"""
学生 API 路由
"""
from typing import List
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.database import get_db
from app.crud import student_crud, score_crud
from app.schemas import (
StudentCreate, StudentUpdate, StudentOut,
StudentWithClass, StudentDetail, GenderEnum
)
from app.utils.response import success, error, page_response
router = APIRouter()
@router.post("/students", summary="创建学生")
def create_student(student_data: StudentCreate, db: Session = Depends(get_db)):
"""创建新学生"""
# 检查学号是否已存在
existing = student_crud.get_student_by_no(db, student_data.student_no)
if existing:
raise HTTPException(status_code=400, detail="学号已存在")
db_student = student_crud.create_student(db, student_data)
return success(data=StudentOut.model_validate(db_student), message="创建成功")
@router.get("/students", summary="获取学生列表")
def get_students(
page: int = Query(default=1, ge=1, description="页码"),
size: int = Query(default=10, ge=1, le=100, description="每页数量"),
class_id: int = Query(default=None, description="班级ID"),
gender: str = Query(default=None, description="性别"),
keyword: str = Query(default=None, description="搜索关键词"),
db: Session = Depends(get_db)
):
"""获取学生列表,支持分页、筛选和搜索"""
skip = (page - 1) * size
items, total = student_crud.get_students(
db, skip=skip, limit=size,
class_id=class_id, gender=gender, keyword=keyword
)
# 转换为带班级名称的响应
student_list = []
for item in items:
student = StudentWithClass.model_validate(item)
student.class_name = item.class_info.name if item.class_info else None
student_list.append(student)
return page_response(student_list, total, page, size)
@router.get("/students/{student_id}", summary="获取学生详情")
def get_student(student_id: int, db: Session = Depends(get_db)):
"""获取学生详情,包含班级和成绩信息"""
db_student = student_crud.get_student(db, student_id)
if not db_student:
raise HTTPException(status_code=404, detail="学生不存在")
# 获取成绩
scores = score_crud.get_student_scores(db, student_id)
result = StudentDetail.model_validate(db_student)
result.class_name = db_student.class_info.name if db_student.class_info else None
result.scores = scores
return success(data=result)
@router.put("/students/{student_id}", summary="更新学生")
def update_student(student_id: int, student_data: StudentUpdate, db: Session = Depends(get_db)):
"""更新学生信息"""
db_student = student_crud.update_student(db, student_id, student_data)
if not db_student:
raise HTTPException(status_code=404, detail="学生不存在")
return success(data=StudentOut.model_validate(db_student), message="更新成功")
@router.delete("/students/{student_id}", summary="删除学生")
def delete_student(student_id: int, db: Session = Depends(get_db)):
"""删除学生(同时删除其成绩)"""
if not student_crud.delete_student(db, student_id):
raise HTTPException(status_code=404, detail="学生不存在")
return success(message="删除成功")
@router.get("/students/{student_id}/scores", summary="获取学生成绩")
def get_student_scores(student_id: int, db: Session = Depends(get_db)):
"""获取学生的所有成绩"""
db_student = student_crud.get_student(db, student_id)
if not db_student:
raise HTTPException(status_code=404, detail="学生不存在")
scores = score_crud.get_student_scores(db, student_id)
return success(data=scores)
7️⃣ 成绩 API 路由
app/api/scores.py
"""
成绩 API 路由
"""
from typing import List
from datetime import date
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.database import get_db
from app.crud import score_crud, student_crud
from app.schemas import (
ScoreCreate, ScoreUpdate, ScoreOut,
ScoreWithStudent, ScoreBatchCreate
)
from app.utils.response import success, error, page_response
router = APIRouter()
@router.post("/scores", summary="录入成绩")
def create_score(score_data: ScoreCreate, db: Session = Depends(get_db)):
"""录入单条成绩"""
# 检查学生是否存在
student = student_crud.get_student(db, score_data.student_id)
if not student:
raise HTTPException(status_code=400, detail="学生不存在")
db_score = score_crud.create_score(db, score_data)
return success(data=ScoreOut.model_validate(db_score), message="录入成功")
@router.post("/scores/batch", summary="批量录入成绩")
def create_scores_batch(batch_data: ScoreBatchCreate, db: Session = Depends(get_db)):
"""批量录入成绩(一个学生多个科目)"""
# 检查学生是否存在
student = student_crud.get_student(db, batch_data.student_id)
if not student:
raise HTTPException(status_code=400, detail="学生不存在")
scores = score_crud.create_scores_batch(
db,
student_id=batch_data.student_id,
scores=batch_data.scores,
exam_date=batch_data.exam_date,
exam_type=batch_data.exam_type.value
)
return success(data=[ScoreOut.model_validate(s) for s in scores], message="批量录入成功")
@router.get("/scores", summary="获取成绩列表")
def get_scores(
page: int = Query(default=1, ge=1),
size: int = Query(default=10, ge=1, le=100),
student_id: int = Query(default=None, description="学生ID"),
subject: str = Query(default=None, description="科目"),
exam_type: str = Query(default=None, description="考试类型"),
start_date: date = Query(default=None, description="开始日期"),
end_date: date = Query(default=None, description="结束日期"),
db: Session = Depends(get_db)
):
"""获取成绩列表,支持多条件筛选"""
skip = (page - 1) * size
items, total = score_crud.get_scores(
db, skip=skip, limit=size,
student_id=student_id, subject=subject, exam_type=exam_type,
start_date=start_date, end_date=end_date
)
# 转换为带学生信息的响应
score_list = []
for item in items:
score = ScoreWithStudent.model_validate(item)
score.student_name = item.student.name
score.student_no = item.student.student_no
score_list.append(score)
return page_response(score_list, total, page, size)
@router.get("/scores/{score_id}", summary="获取成绩详情")
def get_score(score_id: int, db: Session = Depends(get_db)):
"""获取成绩详情"""
db_score = score_crud.get_score(db, score_id)
if not db_score:
raise HTTPException(status_code=404, detail="成绩不存在")
result = ScoreWithStudent.model_validate(db_score)
result.student_name = db_score.student.name
result.student_no = db_score.student.student_no
return success(data=result)
@router.put("/scores/{score_id}", summary="更新成绩")
def update_score(score_id: int, score_data: ScoreUpdate, db: Session = Depends(get_db)):
"""更新成绩"""
db_score = score_crud.update_score(db, score_id, score_data)
if not db_score:
raise HTTPException(status_code=404, detail="成绩不存在")
return success(data=ScoreOut.model_validate(db_score), message="更新成功")
@router.delete("/scores/{score_id}", summary="删除成绩")
def delete_score(score_id: int, db: Session = Depends(get_db)):
"""删除成绩"""
if not score_crud.delete_score(db, score_id):
raise HTTPException(status_code=404, detail="成绩不存在")
return success(message="删除成功")
8️⃣ 注册路由
更新 app/main.py:
"""
主程序入口
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import settings
from app.database import engine, Base
from app.models import Class, Student, Score
# 导入路由
from app.api import classes, students, scores
# 创建数据库表
Base.metadata.create_all(bind=engine)
# 创建 FastAPI 应用
app = FastAPI(
title=settings.PROJECT_NAME,
description=settings.PROJECT_DESCRIPTION,
version=settings.PROJECT_VERSION,
)
# 配置 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册路由
app.include_router(classes.router, prefix=settings.API_PREFIX, tags=["班级管理"])
app.include_router(students.router, prefix=settings.API_PREFIX, tags=["学生管理"])
app.include_router(scores.router, prefix=settings.API_PREFIX, tags=["成绩管理"])
@app.get("/")
def root():
return {"message": "欢迎使用学生管理系统", "docs": "/docs"}
@app.get("/health")
def health_check():
return {"status": "healthy"}
📝 小结
本章我们完成了:
- ✅ 班级的 CRUD 操作和 API
- ✅ 学生的 CRUD 操作和 API
- ✅ 成绩的 CRUD 操作和 API
- ✅ 路由分组和注册
🏃 下一步
基本的 CRUD 完成了,接下来添加进阶功能!
