Python SQLAlchemy与Alembic数据库迁移
前言
SQLAlchemy 是 Python 中最流行的 ORM 框架之一,提供了完整的 SQL 能力和企业级持久化模式。Alembic 是 SQLAlchemy 作者开发的数据库迁移工具,轻量级且功能强大,类似于 Django 的 migration 或 Ruby on Rails 的 migrations。
一、SQLAlchemy 基础
1.1 安装
pip install sqlalchemy
# 或安装带异步支持的版本
pip install sqlalchemy[asyncio]
1.2 连接数据库
from sqlalchemy import create_engine
# SQLite
engine = create_engine('sqlite:///example.db')
# MySQL
engine = create_engine('mysql+pymysql://user:password@localhost:3306/dbname')
# PostgreSQL
engine = create_engine('postgresql://user:password@localhost:5432/dbname')
# 连接池配置
engine = create_engine(
'mysql+pymysql://user:password@localhost:3306/dbname',
pool_size=10, # 连接池大小
max_overflow=5, # 超过pool_size后允许的最大连接数
pool_timeout=30, # 获取连接超时时间
pool_recycle=3600, # 连接回收时间
echo=True # 打印SQL日志
)
1.3 定义模型
from sqlalchemy import Column, Integer, String, DateTime, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from datetime import datetime
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(50), unique=True, nullable=False, comment='用户名')
email = Column(String(100), unique=True, nullable=False, comment='邮箱')
password_hash = Column(String(255), nullable=False)
created_at = Column(DateTime, default=datetime.now, comment='创建时间')
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now, comment='更新时间')
# 关联关系
posts = relationship('Post', back_populates='author')
def __repr__(self):
return f"<User(id={self.id}, username='{self.username}')>"
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
content = Column(Text)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
created_at = Column(DateTime, default=datetime.now)
# 关联关系
author = relationship('User', back_populates='posts')
1.4 Session 操作
from sqlalchemy.orm import sessionmaker, Session
# 创建Session工厂
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
# 使用上下文管理器
from contextlib import contextmanager
@contextmanager
def get_db():
db = SessionLocal()
try:
yield db
db.commit()
except Exception as e:
db.rollback()
raise e
finally:
db.close()
# CRUD 操作示例
def create_user(db: Session, username: str, email: str, password: str):
user = User(username=username, email=email, password_hash=password)
db.add(user)
db.commit()
db.refresh(user)
return user
def get_user_by_id(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(User).offset(skip).limit(limit).all()
def update_user(db: Session, user_id: int, **kwargs):
user = db.query(User).filter(User.id == user_id).first()
if user:
for key, value in kwargs.items():
setattr(user, key, value)
db.commit()
db.refresh(user)
return user
def delete_user(db: Session, user_id: int):
user = db.query(User).filter(User.id == user_id).first()
if user:
db.delete(user)
db.commit()
return user
1.5 查询进阶
from sqlalchemy import or_, and_, func, desc
# 条件查询
users = db.query(User).filter(User.username.like('%test%')).all()
# 多条件组合
users = db.query(User).filter(
and_(
User.username.like('%test%'),
or_(User.email.endswith('@gmail.com'), User.email.endswith('@qq.com'))
)
).all()
# 排序和分页
users = db.query(User).order_by(desc(User.created_at)).offset(10).limit(10).all()
# 聚合查询
from sqlalchemy import func
result = db.query(
User.username,
func.count(Post.id).label('post_count')
).join(Post).group_by(User.id).all()
# 关联查询(JOIN)
posts = db.query(Post).join(User).filter(User.username == 'admin').all()
# 使用 outerjoin
posts = db.query(Post).outerjoin(User).all()
# 子查询
subq = db.query(Post.user_id, func.count('*').label('count')).group_by(Post.user_id).subquery()
result = db.query(User, subq.c.count).outerjoin(subq, User.id == subq.c.user_id).all()
二、Alembic 数据库迁移
2.1 安装与初始化
pip install alembic
# 在项目根目录初始化
cd your_project
alembic init alembic
初始化后会生成以下结构:
your_project/
├── alembic/
│ ├── versions/ # 迁移脚本存放目录
│ ├── env.py # Alembic 环境配置
│ ├── script.py.mako # 迁移脚本模板
│ └── README
├── alembic.ini # Alembic 配置文件
└── your_models.py
2.2 配置 alembic.ini
修改 alembic.ini 文件中的数据库连接:
# alembic.ini
[alembic]
# 迁移脚本存放目录
script_location = alembic
# 数据库连接字符串
sqlalchemy.url = mysql+pymysql://user:password@localhost:3306/dbname
# 也可以使用环境变量
# sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
# 迁移脚本生成后执行的钩子(如格式化代码)
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -q
2.3 配置 env.py
修改 alembic/env.py,导入你的模型:
# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# 导入你的 Base 和模型
import sys
sys.path.insert(0, '.')
from models import Base # 替换为你的模型路径
# this is the Alembic Config object
config = context.config
# 设置 target_metadata 为你的 Base.metadata
target_metadata = Base.metadata
# 其他配置保持不变...
2.4 常用迁移命令
# 生成迁移脚本(自动检测模型变化)
alembic revision --autogenerate -m "initial tables"
# 手动创建空白迁移脚本
alembic revision -m "add user table"
# 查看当前数据库版本
alembic current
# 查看迁移历史
alembic history
# 升级到最新版本
alembic upgrade head
# 升级到指定版本
alembic upgrade <revision_id>
# 降级一个版本
alembic downgrade -1
# 降级到指定版本
alembic downgrade <revision_id>
# 降级到初始状态
alembic downgrade base
# 查看将要执行的SQL(不实际执行)
alembic upgrade head --sql
# 标记当前版本(不执行迁移)
alembic stamp head
2.5 迁移脚本示例
自动生成的迁移脚本示例:
# alembic/versions/123456789abc_initial_tables.py
"""initial tables
Revision ID: 123456789abc
Revises:
Create Date: 2024-01-01 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '123456789abc'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# 创建 users 表
op.create_table(
'users',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('username', sa.String(length=50), nullable=False),
sa.Column('email', sa.String(length=100), nullable=False),
sa.Column('password_hash', sa.String(length=255), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('username'),
sa.UniqueConstraint('email')
)
# 创建索引
op.create_index(op.f('ix_users_username'), 'users', ['username'], unique=True)
def downgrade():
# 删除索引
op.drop_index(op.f('ix_users_username'), table_name='users')
# 删除表
op.drop_table('users')
2.6 手动迁移操作
# 添加列
def upgrade():
op.add_column('users', sa.Column('phone', sa.String(20), nullable=True))
def downgrade():
op.drop_column('users', 'phone')
# 删除列
def upgrade():
op.drop_column('users', 'phone')
# 修改列
def upgrade():
op.alter_column('users', 'username',
existing_type=sa.String(50),
type_=sa.String(100),
existing_nullable=False
)
# 添加外键
def upgrade():
op.create_foreign_key(
'fk_posts_user_id',
'posts', 'users',
['user_id'], ['id']
)
def downgrade():
op.drop_constraint('fk_posts_user_id', 'posts', type_='foreignkey')
# 执行原生SQL
def upgrade():
op.execute("UPDATE users SET status = 'active' WHERE status IS NULL")
# 批量操作(适合SQLite等不支持某些操作的数据库)
def upgrade():
with op.batch_alter_table('users') as batch_op:
batch_op.add_column(sa.Column('phone', sa.String(20)))
batch_op.alter_column('username', type_=sa.String(100))
三、最佳实践
3.1 项目结构推荐
project/
├── app/
│ ├── __init__.py
│ ├── database.py # 数据库连接和Session
│ ├── models/ # 模型定义
│ │ ├── __init__.py
│ │ ├── base.py # Base定义
│ │ ├── user.py
│ │ └── post.py
│ └── schemas/ # Pydantic 模型
├── alembic/
│ ├── versions/
│ ├── env.py
│ └── ...
├── alembic.ini
└── main.py
3.2 异步支持
# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
DATABASE_URL = "mysql+aiomysql://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()
# 异步操作示例
async def get_users_async():
async with AsyncSessionLocal() as session:
result = await session.execute(select(User))
return result.scalars().all()
3.3 迁移注意事项
- 生产环境谨慎使用 autogenerate
- 自动生成可能遗漏某些变更或产生意外的修改
- 建议人工审核每个迁移脚本
- 测试迁移的上下兼容性
- 确保 downgrade 能正确回滚
- 在测试环境先验证迁移
- 大数据表迁移策略
- 添加列时使用
server_default避免锁表 - 考虑分批更新数据
- 添加列时使用
# 安全添加非空列(大数据表)
def upgrade():
op.add_column('users', sa.Column('status', sa.String(20), server_default='active'))
# 后续再分批更新后移除默认值
四、常见问题
4.1 迁移版本冲突
当多人开发时可能出现版本冲突:
# 合并多个头版本
alembic merge -m "merge branches" head1 head2
4.2 模型导入问题
确保 env.py 中正确导入所有模型:
# 方式1:显式导入每个模型
from models import Base, User, Post
# 方式2:在 Base 导入后,导入所有模型模块
from models.base import Base
import models.user # noqa
import models.post # noqa
4.3 SQLite 外键约束
SQLite 默认不启用外键约束,需要在 env.py 中配置:
def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool
)
with connectable.connect() as connection:
# 启用外键约束(SQLite)
connection.execute(sa.text('PRAGMA foreign_keys=ON'))
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()