FastAPI 入门教程FastAPI 入门教程
首页
基础教程
实战项目
FastAPI官网
首页
基础教程
实战项目
FastAPI官网
  • 基础教程

    • 📚 基础教程
    • 第0章 - Python 快速入门
    • 第1章 - FastAPI 简介
    • 第2章 - 环境搭建
    • 第3章 - 第一个 API
    • 第4章 - 路径参数
    • 第5章 - 查询参数
    • 第6章 - 请求体
    • 第7章 - 响应模型
    • 第8章 - CRUD 操作
    • 第9章 - 数据库操作

第9章 - 数据库操作

嗨,朋友!我是长安。

这一章我们要正式连接数据库了!说实话,这是整个基础教程的最后一章,也是最重要的一章。掌握了数据库操作,你就真正入门了!我当年学到这里时花了不少时间,所以我会尽量讲得简单易懂。

🎯 本章目标

  • 了解 ORM 的概念
  • 学会使用 SQLAlchemy 连接数据库
  • 掌握数据库模型的定义
  • 实现数据库版的 CRUD

1️⃣ 什么是 ORM?

ORM(Object-Relational Mapping,对象关系映射)是一种技术,让你可以用 Python 对象来操作数据库,而不用写 SQL 语句。

说实话,这是我工作中最喜欢的功能之一,让开发效率提升了好几倍!

传统方式ORM 方式
写 SQL 语句操作 Python 对象
SELECT * FROM usersdb.query(User).all()
INSERT INTO users...db.add(user)

常用的 Python ORM

  • SQLAlchemy - 最流行,功能强大
  • Tortoise ORM - 异步 ORM
  • Peewee - 轻量级 ORM

本教程使用 SQLAlchemy。

2️⃣ 安装依赖

# 安装 SQLAlchemy
pip install sqlalchemy

# 如果使用 SQLite(推荐新手)
# 不需要额外安装

# 如果使用 MySQL
pip install pymysql

# 如果使用 PostgreSQL
pip install psycopg2-binary

3️⃣ 项目结构

project/
├── main.py          # 主程序
├── database.py      # 数据库配置
├── models.py        # 数据库模型
├── schemas.py       # Pydantic 模型
└── crud.py          # CRUD 操作

4️⃣ 配置数据库连接

# database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 数据库 URL
# SQLite(文件数据库,适合学习和小项目)
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"

# MySQL
# SQLALCHEMY_DATABASE_URL = "mysql+pymysql://user:password@localhost/dbname"

# PostgreSQL
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname"

# 创建数据库引擎
engine = create_engine(
    SQLALCHEMY_DATABASE_URL,
    connect_args={"check_same_thread": False}  # SQLite 需要这个参数
)

# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 创建基类
Base = declarative_base()

# 获取数据库会话的依赖
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

5️⃣ 定义数据库模型

# models.py
from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime, Text
from sqlalchemy.sql import func
from database import Base

class User(Base):
    """用户表"""
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    username = Column(String(50), unique=True, index=True, nullable=False)
    email = Column(String(100), unique=True, index=True, nullable=False)
    hashed_password = Column(String(100), nullable=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())

class Book(Base):
    """图书表"""
    __tablename__ = "books"
    
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    title = Column(String(100), nullable=False)
    author = Column(String(50), nullable=False)
    price = Column(Float, nullable=False)
    description = Column(Text, nullable=True)
    stock = Column(Integer, default=0)
    is_available = Column(Boolean, default=True)
    created_at = Column(DateTime, server_default=func.now())

常用字段类型

SQLAlchemy 类型Python 类型说明
Integerint整数
String(n)str字符串,n 为最大长度
Textstr长文本
Floatfloat浮点数
Booleanbool布尔值
DateTimedatetime日期时间
Datedate日期

常用字段参数

参数说明
primary_key=True主键
index=True创建索引
unique=True唯一约束
nullable=False不允许为空
default=value默认值
autoincrement=True自增

6️⃣ 定义 Pydantic 模型

# schemas.py
from typing import Optional
from datetime import datetime
from pydantic import BaseModel, Field

# ========== 用户模型 ==========

class UserBase(BaseModel):
    username: str = Field(min_length=3, max_length=50)
    email: str

class UserCreate(UserBase):
    password: str = Field(min_length=6)

class UserUpdate(BaseModel):
    email: Optional[str] = None
    is_active: Optional[bool] = None

class UserOut(UserBase):
    id: int
    is_active: bool
    created_at: datetime
    
    class Config:
        from_attributes = True  # 允许从 ORM 模型转换

# ========== 图书模型 ==========

class BookBase(BaseModel):
    title: str = Field(min_length=1, max_length=100)
    author: str = Field(min_length=1, max_length=50)
    price: float = Field(gt=0)
    description: Optional[str] = None
    stock: int = Field(default=0, ge=0)

class BookCreate(BookBase):
    pass

class BookUpdate(BaseModel):
    title: Optional[str] = None
    author: Optional[str] = None
    price: Optional[float] = None
    description: Optional[str] = None
    stock: Optional[int] = None
    is_available: Optional[bool] = None

class BookOut(BookBase):
    id: int
    is_available: bool
    created_at: datetime
    
    class Config:
        from_attributes = True

7️⃣ 实现 CRUD 操作

# crud.py
from sqlalchemy.orm import Session
from models import User, Book
from schemas import UserCreate, UserUpdate, BookCreate, BookUpdate

# ========== 用户 CRUD ==========

def get_user(db: Session, user_id: int):
    """根据ID获取用户"""
    return db.query(User).filter(User.id == user_id).first()

def get_user_by_username(db: Session, username: str):
    """���据用户名获取用户"""
    return db.query(User).filter(User.username == username).first()

def get_users(db: Session, skip: int = 0, limit: int = 10):
    """获取用户列表"""
    return db.query(User).offset(skip).limit(limit).all()

def create_user(db: Session, user: UserCreate):
    """创建用户"""
    # 实际项目中应该对密码进行哈希处理
    db_user = User(
        username=user.username,
        email=user.email,
        hashed_password=user.password + "_hashed"  # 简化处理
    )
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

def update_user(db: Session, user_id: int, user: UserUpdate):
    """更新用户"""
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user:
        update_data = user.model_dump(exclude_unset=True)
        for key, value in update_data.items():
            setattr(db_user, key, value)
        db.commit()
        db.refresh(db_user)
    return db_user

def delete_user(db: Session, user_id: int):
    """删除用户"""
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user:
        db.delete(db_user)
        db.commit()
        return True
    return False

# ========== 图书 CRUD ==========

def get_book(db: Session, book_id: int):
    return db.query(Book).filter(Book.id == book_id).first()

def get_books(db: Session, skip: int = 0, limit: int = 10, keyword: str = None):
    query = db.query(Book)
    if keyword:
        query = query.filter(
            (Book.title.contains(keyword)) | (Book.author.contains(keyword))
        )
    return query.offset(skip).limit(limit).all()

def create_book(db: Session, book: BookCreate):
    db_book = Book(**book.model_dump())
    db.add(db_book)
    db.commit()
    db.refresh(db_book)
    return db_book

def update_book(db: Session, book_id: int, book: BookUpdate):
    db_book = db.query(Book).filter(Book.id == book_id).first()
    if db_book:
        update_data = book.model_dump(exclude_unset=True)
        for key, value in update_data.items():
            setattr(db_book, key, value)
        db.commit()
        db.refresh(db_book)
    return db_book

def delete_book(db: Session, book_id: int):
    db_book = db.query(Book).filter(Book.id == book_id).first()
    if db_book:
        db.delete(db_book)
        db.commit()
        return True
    return False

8️⃣ 创建 API 接口

# main.py
from typing import List
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session

from database import engine, get_db, Base
from models import User, Book
import crud
from schemas import (
    UserCreate, UserUpdate, UserOut,
    BookCreate, BookUpdate, BookOut
)

# 创建数据库表
Base.metadata.create_all(bind=engine)

app = FastAPI(title="数据库操作示例")

# ========== 用户接口 ==========

@app.post("/users", response_model=UserOut, status_code=status.HTTP_201_CREATED, tags=["用户"])
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    """创建用户"""
    # 检查用户名是否已存在
    db_user = crud.get_user_by_username(db, user.username)
    if db_user:
        raise HTTPException(status_code=400, detail="用户名已存在")
    return crud.create_user(db, user)

@app.get("/users", response_model=List[UserOut], tags=["用户"])
def get_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
    """获取用户列表"""
    return crud.get_users(db, skip=skip, limit=limit)

@app.get("/users/{user_id}", response_model=UserOut, tags=["用户"])
def get_user(user_id: int, db: Session = Depends(get_db)):
    """获取用户详情"""
    db_user = crud.get_user(db, user_id)
    if not db_user:
        raise HTTPException(status_code=404, detail="用户不存在")
    return db_user

@app.patch("/users/{user_id}", response_model=UserOut, tags=["用户"])
def update_user(user_id: int, user: UserUpdate, db: Session = Depends(get_db)):
    """更新用户"""
    db_user = crud.update_user(db, user_id, user)
    if not db_user:
        raise HTTPException(status_code=404, detail="用户不存在")
    return db_user

@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["用户"])
def delete_user(user_id: int, db: Session = Depends(get_db)):
    """删除用户"""
    if not crud.delete_user(db, user_id):
        raise HTTPException(status_code=404, detail="用户不存在")

# ========== 图书接口 ==========

@app.post("/books", response_model=BookOut, status_code=status.HTTP_201_CREATED, tags=["图书"])
def create_book(book: BookCreate, db: Session = Depends(get_db)):
    """创建图书"""
    return crud.create_book(db, book)

@app.get("/books", response_model=List[BookOut], tags=["图书"])
def get_books(
    skip: int = 0,
    limit: int = 10,
    keyword: str = None,
    db: Session = Depends(get_db)
):
    """获取图书列表"""
    return crud.get_books(db, skip=skip, limit=limit, keyword=keyword)

@app.get("/books/{book_id}", response_model=BookOut, tags=["图书"])
def get_book(book_id: int, db: Session = Depends(get_db)):
    """获取图书详情"""
    db_book = crud.get_book(db, book_id)
    if not db_book:
        raise HTTPException(status_code=404, detail="图书不存在")
    return db_book

@app.patch("/books/{book_id}", response_model=BookOut, tags=["图书"])
def update_book(book_id: int, book: BookUpdate, db: Session = Depends(get_db)):
    """更新图书"""
    db_book = crud.update_book(db, book_id, book)
    if not db_book:
        raise HTTPException(status_code=404, detail="图书不存在")
    return db_book

@app.delete("/books/{book_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["图书"])
def delete_book(book_id: int, db: Session = Depends(get_db)):
    """删除图书"""
    if not crud.delete_book(db, book_id):
        raise HTTPException(status_code=404, detail="图书不存在")

9️⃣ 依赖注入详解

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.get("/users")
def get_users(db: Session = Depends(get_db)):
    # db 会自动注入
    return db.query(User).all()

Depends(get_db) 的作用:

  1. 每次请求时创建一个数据库会话
  2. 请求处理完成后自动关闭会话
  3. 确保数据库连接被正确管理

🔟 常用查询操作

from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, desc, asc

# 查询所有
users = db.query(User).all()

# 查询第一个
user = db.query(User).first()

# 根据条件查询
user = db.query(User).filter(User.id == 1).first()
user = db.query(User).filter(User.username == "zhangsan").first()

# 多条件查询(AND)
users = db.query(User).filter(
    User.is_active == True,
    User.age >= 18
).all()

# 或者使用 and_
users = db.query(User).filter(
    and_(User.is_active == True, User.age >= 18)
).all()

# OR 查询
users = db.query(User).filter(
    or_(User.username == "zhangsan", User.username == "lisi")
).all()

# 模糊查询
users = db.query(User).filter(User.username.like("%zhang%")).all()
users = db.query(User).filter(User.username.contains("zhang")).all()

# 排序
users = db.query(User).order_by(User.created_at.desc()).all()
users = db.query(User).order_by(desc(User.created_at)).all()

# 分页
users = db.query(User).offset(0).limit(10).all()

# 计数
count = db.query(User).count()

# IN 查询
users = db.query(User).filter(User.id.in_([1, 2, 3])).all()

# 不等于
users = db.query(User).filter(User.status != "deleted").all()

# 范围查询
users = db.query(User).filter(User.age.between(18, 30)).all()

# 空值查询
users = db.query(User).filter(User.email.is_(None)).all()
users = db.query(User).filter(User.email.isnot(None)).all()

📝 小结

本章我们学习了:

  1. ✅ ORM 的概念和优势
  2. ✅ SQLAlchemy 的安装和配置
  3. ✅ 数据库模型的定义
  4. ✅ Pydantic 模型与数据库模型的区别
  5. ✅ 实现数据库版的 CRUD
  6. ✅ 依赖注入获取数据库会话
  7. ✅ 常用的查询操作

🎉 基础教程完成!

恭喜你完成了 FastAPI 基础教程的学习!现在你已经掌握了:

  • FastAPI 的基本使用
  • 路径参数和查询参数
  • 请求体和响应模型
  • CRUD 操作
  • 数据库操作

接下来,让我们通过一个完整的实战项目来巩固所学知识!

👉 实战项目 - 学生管理系统

💪 练习题

  1. 创建一个商品表,包含:名称、价格、库存、分类、是否上架

  2. 实现商品的 CRUD 接口

  3. 添加按分类查询、按价格范围查询的功能

参考答案
# models.py
class Product(Base):
    __tablename__ = "products"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(100), nullable=False)
    price = Column(Float, nullable=False)
    stock = Column(Integer, default=0)
    category = Column(String(50))
    is_on_sale = Column(Boolean, default=True)
    created_at = Column(DateTime, server_default=func.now())

# schemas.py
class ProductCreate(BaseModel):
    name: str
    price: float = Field(gt=0)
    stock: int = Field(ge=0, default=0)
    category: str
    is_on_sale: bool = True

class ProductOut(ProductCreate):
    id: int
    created_at: datetime
    
    class Config:
        from_attributes = True

# crud.py
def get_products(
    db: Session,
    skip: int = 0,
    limit: int = 10,
    category: str = None,
    min_price: float = None,
    max_price: float = None
):
    query = db.query(Product)
    if category:
        query = query.filter(Product.category == category)
    if min_price is not None:
        query = query.filter(Product.price >= min_price)
    if max_price is not None:
        query = query.filter(Product.price <= max_price)
    return query.offset(skip).limit(limit).all()

# main.py
@app.get("/products", response_model=List[ProductOut])
def get_products(
    skip: int = 0,
    limit: int = 10,
    category: str = None,
    min_price: float = None,
    max_price: float = None,
    db: Session = Depends(get_db)
):
    return crud.get_products(
        db, skip=skip, limit=limit,
        category=category,
        min_price=min_price,
        max_price=max_price
    )
最近更新: 2025/12/26 11:25
Contributors: 王长安
Prev
第8章 - CRUD 操作