FastAPI 實戰秘籍:從零構建高性能 API -數據庫篇
前面我們講了Fastapi日志、配置管理,在web開發中主要是針對數據庫的增刪改查,今天主要講一下通過sqlalchemy連接數據庫,數據庫會話,數據庫模型的創建和自動建表等。

準備工作
對象關系映射操作庫:
pip install sqlalchemy數據庫連接配置,在config.yaml中增加如下如下配置:
# config.yaml
database:
url: "mysql+pymysql://root:123456@127.0.0.1:3306/zadmin"
async_url: "mysql+asyncmy://root:123456@127.0.0.1:3306/zadmin"
pool_size: 20
echo_sql: false # 是否打印SQL日志需要安裝pymysql和asyncmy庫,處理同步及異步請求:
pip install pymysql asyncmy在前面講解配置中我們設置了配置加載及定義:
# config.py
class DatabaseConfig(BaseModel):
url: str
async_url: str
pool_size: int = 10
echo_sql: bool = False
class Settings(BaseSettings):
...
database: DatabaseConfig
...數據庫連接
在core下創建database.py文件。使用sqlalchemy連接數據庫,數據源的定義如下:
database_url dialect+driver://username:password@host:port/database
# 數據庫連接配置
# MySQL示例: mysql+pymysql://username:password@localhost/dbname
# PostgreSQL示例: postgresql+psycopg2://username:password@localhost/dbname(1) 創建數據庫引擎(Engine)
# core/database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine
from config.config import get_settings
settings = get_settings()
# 異步引擎
async_engine = create_async_engine(
url = settings.database.async_url,
echo=settings.database.echo_sql,
pool_size=settings.database.pool_size
)
# 同步引擎
engine = create_engine(
url = settings.database.url,
echo=settings.database.echo_sql,
pool_size=settings.database.pool_size
)(2) 創建會話工廠(Session Factory)
「會話工廠」是一個用于創建會話實例的工廠函數或類。它預先配置了會話的各種參數,但不會立即創建數據庫連接。
# core/database.py
# 同步會話工廠
session_local = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine)
# 異步會話工廠
session_factory = async_sessionmaker(
autocommit=False,
autoflush=False,
bind=async_engine,
expire_on_commit=True,
class_=AsyncSession
)(3) 創建會話(Session)
「會話」是SQLAlchemy ORM的核心,它代表了一個與數據庫的"對話",負責:
- 管理對象狀態
- 執行數據庫操作
- 處理事務
- 維護對象標識映射
# 異步會話
async def get_async_db():
async with session_factory() as session:
async with session.begin():
yield session
# 同步會話
def get_db():
with session_local() as session:
yield session不使用上下文管理器,也可以通過下面方式定義
def get_db():
db = session_local()
try:
yield db
finally:
db.close()基類定義及自動建表
# 定義基類,所有其它數據實體都繼承于它
Base = declarative_base()
# 如下示例:用戶表模型,除了__*__屬性,其它通過mapper或Column定義的都對應表的字段
# class User(Base):
# __tablename__ = 'users'
# __table_args__ = ({'comment': '用戶表'})
# id: Mapped[int] = mapped_column(Integer, primary_key=True, comment='主鍵ID')
# telephone: Mapped[str] = mapped_column(String(11), unique=True,comment="手機號碼")自動建表:
def create_all_tables():
Base.metadata.create_all(bind=engine)
# 由于main.py中的函數代碼都是同步的,所以下面的函數基本上用到。
async def async_create_all_tables():
async with async_engine.begin() as conn:
# 使用 run_sync 在異步上下文中執行同步操作
await conn.run_sync(Base.metadata.create_all)在main.py中使用:
from core import register_exception, database
def create_app():
"""啟動項目"""
...
database.create_all_tables()高級應用
由于我們創建的數據表大部分u都有created_at、id、updated_at字段,我們可以自定義基類:
# 定義一個公共模型
class BaseModel(Base):
"""
公共 ORM 模型,基表
"""
__abstract__ = True
id: Mapped[int] = mapped_column(Integer, primary_key=True, comment='主鍵ID')
created_at: Mapped[datetime] = mapped_column(
DateTime,
default=func.now(),
comment='創建時間')
updated_at: Mapped[datetime] = mapped_column(
DateTime,
default=func.now(),
onupdate=func.now(),
comment='更新時間'
)
deleted_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=True,
comment='刪除時間')這樣上面的User模型就可以簡化為下面的方式,去掉了ID主鍵的定義:
# class User(BaseModel):
# __tablename__ = 'users'
# __table_args__ = ({'comment': '用戶表'})
# telephone: Mapped[str] = mapped_column(String(11), unique=True,comment="手機號碼")演示
在models下創建文件user.py,定義模型內容如下:
# models/user.py
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column
from core.database import BaseModel, TableName
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
@TableName("users")
class User(BaseModel):
telephone: Mapped[str] = mapped_column(String(11), index=True, unique=True, comment="手機號碼")
password: Mapped[str] = mapped_column(String(128), comment="密碼")
username: Mapped[str] = mapped_column(String(50), index=True, nullable=False, comment="姓名")
@staticmethod
def get_password_hash(password: str) -> str:
"""
生成哈希密碼
:param password: 原始密碼
:return: 哈希密碼
"""
return pwd_context.hash(password)
@staticmethod
def verify_password(password: str, hashed_password: str) -> bool:
"""
驗證原始密碼是否與哈希密碼一致
:param password: 原始密碼
:param hashed_password: 哈希密碼
:return:
"""
return pwd_context.verify(password, hashed_password)(1) 定義路由,實現增刪改查操作
打開數據庫連接工具比如navicat,然后創建數據庫zadmin 啟動服務。
(2) 異步操作
@app.get("/curd/async")
asyncdef async_curd(
db: AsyncSession = Depends(get_async_db),
):
"""創建用戶"""
hashed_password = User.get_password_hash("123456")
user = User(telephone="13800000090", password=hashed_password, username="admin")
db.add(user)
await db.commit()
await db.refresh(user)
print("add user {} {} {}".format(user.username, user.telephone, user.id))
"""根據ID獲取用戶"""
result = await db.execute(select(User).where(User.id == 3))
user = result.scalar()
print(" scalar_one_or_none user {} {} {}".format(user.username, user.telephone, user.id))
"""獲取所有用戶(分頁)"""
result = await db.execute(select(User).offset(0).limit(10))
print(result.scalars().all())
"""更新用戶信息"""
stmt = update(User).where(User.id == 1).values({"telephone":"13522023423"})
result = await db.execute(stmt)
print(result)
result = await db.execute(select(User).where(User.telephone == "13522023423"))
user = result.scalar_one_or_none()
print(user)
# 刪除操作
stmt = delete(User).where(User.id == 3)
result = await db.execute(stmt)
await db.commit()同步操作:
@app.get("/curd/sync")
asyncdef sync_curd(
db: Session = Depends(get_db),
):
"""同步創建用戶"""
hashed_password = User.get_password_hash("123456")
user = User(telephone="13800000090", password=hashed_password, username="admin")
db.add(user)
db.commit()
db.refresh(user)
print("add user {} {} {}".format(user.username, user.telephone, user.id))
"""單行查詢"""
user = db.query(User).filter(User.id == 3).first()
print("user by id {} {} {}".format(user.username, user.telephone, user.id))
"""更新操作"""
user = db.query(User).filter(User.id == 3).first()
user.telephone = "13522023423"
db.query(User).filter(User.id == 3).update({"telephone":"13522023423"})
db.commit()
db.refresh(user)
user = db.query(User).filter(User.id == 1).first()
db.delete(user)
db.commit()





























