第9章 - 数据库操作
嗨,朋友!我是长安。
这一章我们要正式连接数据库了!说实话,这是整个基础教程的最后一章,也是最重要的一章。掌握了数据库操作,你就真正入门了!我当年学到这里时花了不少时间,所以我会尽量讲得简单易懂。
🎯 本章目标
- 了解 ORM 的概念
- 学会使用 SQLAlchemy 连接数据库
- 掌握数据库模型的定义
- 实现数据库版的 CRUD
1️⃣ 什么是 ORM?
ORM(Object-Relational Mapping,对象关系映射)是一种技术,让你可以用 Python 对象来操作数据库,而不用写 SQL 语句。
说实话,这是我工作中最喜欢的功能之一,让开发效率提升了好几倍!
| 传统方式 | ORM 方式 |
|---|---|
| 写 SQL 语句 | 操作 Python 对象 |
SELECT * FROM users | db.query(User).all() |
INSERT INTO users... | db.add(user) |
常用的 Python ORM
- SQLAlchemy - 最流行,功能强大
- Tortoise ORM - 异步 ORM
- Peewee - 轻量级 ORM
本教程使用 SQLAlchemy。
2️⃣ 安装依赖
# 安装 SQLAlchemy
pip install sqlalchemy
# 如果使用 SQLite(推荐新手)
# 不需要额外安装
# 如果使用 MySQL
pip install pymysql
# 如果使用 PostgreSQL
pip install psycopg2-binary
3️⃣ 项目结构
project/
├── main.py # 主程序
├── database.py # 数据库配置
├── models.py # 数据库模型
├── schemas.py # Pydantic 模型
└── crud.py # CRUD 操作
4️⃣ 配置数据库连接
# database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 数据库 URL
# SQLite(文件数据库,适合学习和小项目)
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
# MySQL
# SQLALCHEMY_DATABASE_URL = "mysql+pymysql://user:password@localhost/dbname"
# PostgreSQL
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname"
# 创建数据库引擎
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False} # SQLite 需要这个参数
)
# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 创建基类
Base = declarative_base()
# 获取数据库会话的依赖
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
5️⃣ 定义数据库模型
# models.py
from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime, Text
from sqlalchemy.sql import func
from database import Base
class User(Base):
"""用户表"""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
username = Column(String(50), unique=True, index=True, nullable=False)
email = Column(String(100), unique=True, index=True, nullable=False)
hashed_password = Column(String(100), nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
class Book(Base):
"""图书表"""
__tablename__ = "books"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
title = Column(String(100), nullable=False)
author = Column(String(50), nullable=False)
price = Column(Float, nullable=False)
description = Column(Text, nullable=True)
stock = Column(Integer, default=0)
is_available = Column(Boolean, default=True)
created_at = Column(DateTime, server_default=func.now())
常用字段类型
| SQLAlchemy 类型 | Python 类型 | 说明 |
|---|---|---|
| Integer | int | 整数 |
| String(n) | str | 字符串,n 为最大长度 |
| Text | str | 长文本 |
| Float | float | 浮点数 |
| Boolean | bool | 布尔值 |
| DateTime | datetime | 日期时间 |
| Date | date | 日期 |
常用字段参数
| 参数 | 说明 |
|---|---|
primary_key=True | 主键 |
index=True | 创建索引 |
unique=True | 唯一约束 |
nullable=False | 不允许为空 |
default=value | 默认值 |
autoincrement=True | 自增 |
6️⃣ 定义 Pydantic 模型
# schemas.py
from typing import Optional
from datetime import datetime
from pydantic import BaseModel, Field
# ========== 用户模型 ==========
class UserBase(BaseModel):
username: str = Field(min_length=3, max_length=50)
email: str
class UserCreate(UserBase):
password: str = Field(min_length=6)
class UserUpdate(BaseModel):
email: Optional[str] = None
is_active: Optional[bool] = None
class UserOut(UserBase):
id: int
is_active: bool
created_at: datetime
class Config:
from_attributes = True # 允许从 ORM 模型转换
# ========== 图书模型 ==========
class BookBase(BaseModel):
title: str = Field(min_length=1, max_length=100)
author: str = Field(min_length=1, max_length=50)
price: float = Field(gt=0)
description: Optional[str] = None
stock: int = Field(default=0, ge=0)
class BookCreate(BookBase):
pass
class BookUpdate(BaseModel):
title: Optional[str] = None
author: Optional[str] = None
price: Optional[float] = None
description: Optional[str] = None
stock: Optional[int] = None
is_available: Optional[bool] = None
class BookOut(BookBase):
id: int
is_available: bool
created_at: datetime
class Config:
from_attributes = True
7️⃣ 实现 CRUD 操作
# crud.py
from sqlalchemy.orm import Session
from models import User, Book
from schemas import UserCreate, UserUpdate, BookCreate, BookUpdate
# ========== 用户 CRUD ==========
def get_user(db: Session, user_id: int):
"""根据ID获取用户"""
return db.query(User).filter(User.id == user_id).first()
def get_user_by_username(db: Session, username: str):
"""���据用户名获取用户"""
return db.query(User).filter(User.username == username).first()
def get_users(db: Session, skip: int = 0, limit: int = 10):
"""获取用户列表"""
return db.query(User).offset(skip).limit(limit).all()
def create_user(db: Session, user: UserCreate):
"""创建用户"""
# 实际项目中应该对密码进行哈希处理
db_user = User(
username=user.username,
email=user.email,
hashed_password=user.password + "_hashed" # 简化处理
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def update_user(db: Session, user_id: int, user: UserUpdate):
"""更新用户"""
db_user = db.query(User).filter(User.id == user_id).first()
if db_user:
update_data = user.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_user, key, value)
db.commit()
db.refresh(db_user)
return db_user
def delete_user(db: Session, user_id: int):
"""删除用户"""
db_user = db.query(User).filter(User.id == user_id).first()
if db_user:
db.delete(db_user)
db.commit()
return True
return False
# ========== 图书 CRUD ==========
def get_book(db: Session, book_id: int):
return db.query(Book).filter(Book.id == book_id).first()
def get_books(db: Session, skip: int = 0, limit: int = 10, keyword: str = None):
query = db.query(Book)
if keyword:
query = query.filter(
(Book.title.contains(keyword)) | (Book.author.contains(keyword))
)
return query.offset(skip).limit(limit).all()
def create_book(db: Session, book: BookCreate):
db_book = Book(**book.model_dump())
db.add(db_book)
db.commit()
db.refresh(db_book)
return db_book
def update_book(db: Session, book_id: int, book: BookUpdate):
db_book = db.query(Book).filter(Book.id == book_id).first()
if db_book:
update_data = book.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_book, key, value)
db.commit()
db.refresh(db_book)
return db_book
def delete_book(db: Session, book_id: int):
db_book = db.query(Book).filter(Book.id == book_id).first()
if db_book:
db.delete(db_book)
db.commit()
return True
return False
8️⃣ 创建 API 接口
# main.py
from typing import List
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from database import engine, get_db, Base
from models import User, Book
import crud
from schemas import (
UserCreate, UserUpdate, UserOut,
BookCreate, BookUpdate, BookOut
)
# 创建数据库表
Base.metadata.create_all(bind=engine)
app = FastAPI(title="数据库操作示例")
# ========== 用户接口 ==========
@app.post("/users", response_model=UserOut, status_code=status.HTTP_201_CREATED, tags=["用户"])
def create_user(user: UserCreate, db: Session = Depends(get_db)):
"""创建用户"""
# 检查用户名是否已存在
db_user = crud.get_user_by_username(db, user.username)
if db_user:
raise HTTPException(status_code=400, detail="用户名已存在")
return crud.create_user(db, user)
@app.get("/users", response_model=List[UserOut], tags=["用户"])
def get_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
"""获取用户列表"""
return crud.get_users(db, skip=skip, limit=limit)
@app.get("/users/{user_id}", response_model=UserOut, tags=["用户"])
def get_user(user_id: int, db: Session = Depends(get_db)):
"""获取用户详情"""
db_user = crud.get_user(db, user_id)
if not db_user:
raise HTTPException(status_code=404, detail="用户不存在")
return db_user
@app.patch("/users/{user_id}", response_model=UserOut, tags=["用户"])
def update_user(user_id: int, user: UserUpdate, db: Session = Depends(get_db)):
"""更新用户"""
db_user = crud.update_user(db, user_id, user)
if not db_user:
raise HTTPException(status_code=404, detail="用户不存在")
return db_user
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["用户"])
def delete_user(user_id: int, db: Session = Depends(get_db)):
"""删除用户"""
if not crud.delete_user(db, user_id):
raise HTTPException(status_code=404, detail="用户不存在")
# ========== 图书接口 ==========
@app.post("/books", response_model=BookOut, status_code=status.HTTP_201_CREATED, tags=["图书"])
def create_book(book: BookCreate, db: Session = Depends(get_db)):
"""创建图书"""
return crud.create_book(db, book)
@app.get("/books", response_model=List[BookOut], tags=["图书"])
def get_books(
skip: int = 0,
limit: int = 10,
keyword: str = None,
db: Session = Depends(get_db)
):
"""获取图书列表"""
return crud.get_books(db, skip=skip, limit=limit, keyword=keyword)
@app.get("/books/{book_id}", response_model=BookOut, tags=["图书"])
def get_book(book_id: int, db: Session = Depends(get_db)):
"""获取图书详情"""
db_book = crud.get_book(db, book_id)
if not db_book:
raise HTTPException(status_code=404, detail="图书不存在")
return db_book
@app.patch("/books/{book_id}", response_model=BookOut, tags=["图书"])
def update_book(book_id: int, book: BookUpdate, db: Session = Depends(get_db)):
"""更新图书"""
db_book = crud.update_book(db, book_id, book)
if not db_book:
raise HTTPException(status_code=404, detail="图书不存在")
return db_book
@app.delete("/books/{book_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["图书"])
def delete_book(book_id: int, db: Session = Depends(get_db)):
"""删除图书"""
if not crud.delete_book(db, book_id):
raise HTTPException(status_code=404, detail="图书不存在")
9️⃣ 依赖注入详解
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/users")
def get_users(db: Session = Depends(get_db)):
# db 会自动注入
return db.query(User).all()
Depends(get_db) 的作用:
- 每次请求时创建一个数据库会话
- 请求处理完成后自动关闭会话
- 确保数据库连接被正确管理
🔟 常用查询操作
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, desc, asc
# 查询所有
users = db.query(User).all()
# 查询第一个
user = db.query(User).first()
# 根据条件查询
user = db.query(User).filter(User.id == 1).first()
user = db.query(User).filter(User.username == "zhangsan").first()
# 多条件查询(AND)
users = db.query(User).filter(
User.is_active == True,
User.age >= 18
).all()
# 或者使用 and_
users = db.query(User).filter(
and_(User.is_active == True, User.age >= 18)
).all()
# OR 查询
users = db.query(User).filter(
or_(User.username == "zhangsan", User.username == "lisi")
).all()
# 模糊查询
users = db.query(User).filter(User.username.like("%zhang%")).all()
users = db.query(User).filter(User.username.contains("zhang")).all()
# 排序
users = db.query(User).order_by(User.created_at.desc()).all()
users = db.query(User).order_by(desc(User.created_at)).all()
# 分页
users = db.query(User).offset(0).limit(10).all()
# 计数
count = db.query(User).count()
# IN 查询
users = db.query(User).filter(User.id.in_([1, 2, 3])).all()
# 不等于
users = db.query(User).filter(User.status != "deleted").all()
# 范围查询
users = db.query(User).filter(User.age.between(18, 30)).all()
# 空值查询
users = db.query(User).filter(User.email.is_(None)).all()
users = db.query(User).filter(User.email.isnot(None)).all()
📝 小结
本章我们学习了:
- ✅ ORM 的概念和优势
- ✅ SQLAlchemy 的安装和配置
- ✅ 数据库模型的定义
- ✅ Pydantic 模型与数据库模型的区别
- ✅ 实现数据库版的 CRUD
- ✅ 依赖注入获取数据库会话
- ✅ 常用的查询操作
🎉 基础教程完成!
恭喜你完成了 FastAPI 基础教程的学习!现在你已经掌握了:
- FastAPI 的基本使用
- 路径参数和查询参数
- 请求体和响应模型
- CRUD 操作
- 数据库操作
接下来,让我们通过一个完整的实战项目来巩固所学知识!
💪 练习题
创建一个商品表,包含:名称、价格、库存、分类、是否上架
实现商品的 CRUD 接口
添加按分类查询、按价格范围查询的功能
参考答案
# models.py
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100), nullable=False)
price = Column(Float, nullable=False)
stock = Column(Integer, default=0)
category = Column(String(50))
is_on_sale = Column(Boolean, default=True)
created_at = Column(DateTime, server_default=func.now())
# schemas.py
class ProductCreate(BaseModel):
name: str
price: float = Field(gt=0)
stock: int = Field(ge=0, default=0)
category: str
is_on_sale: bool = True
class ProductOut(ProductCreate):
id: int
created_at: datetime
class Config:
from_attributes = True
# crud.py
def get_products(
db: Session,
skip: int = 0,
limit: int = 10,
category: str = None,
min_price: float = None,
max_price: float = None
):
query = db.query(Product)
if category:
query = query.filter(Product.category == category)
if min_price is not None:
query = query.filter(Product.price >= min_price)
if max_price is not None:
query = query.filter(Product.price <= max_price)
return query.offset(skip).limit(limit).all()
# main.py
@app.get("/products", response_model=List[ProductOut])
def get_products(
skip: int = 0,
limit: int = 10,
category: str = None,
min_price: float = None,
max_price: float = None,
db: Session = Depends(get_db)
):
return crud.get_products(
db, skip=skip, limit=limit,
category=category,
min_price=min_price,
max_price=max_price
)
