Scaffold a SQLAlchemy 2.0+ project with Python, declarative models (mapped_column), Alembic migrations, async sessions, repository pattern, relationships, and engine configuration.
84
81%
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Risky
Do not use without reviewing
Scaffold a SQLAlchemy 2.0+ project with Python, declarative models (mapped_column), Alembic migrations, async sessions, repository pattern, relationships, and engine configuration.
pip install sqlalchemy[asyncio] asyncpg alembic pydantic-settings
# Initialize Alembic
alembic init alembic
# For async support, edit alembic/env.py to use async engine (see patterns below)# Initialize Alembic (first time only)
alembic init alembic
# Generate migration from model changes
alembic revision --autogenerate -m "initial"
# Apply all pending migrations
alembic upgrade head
# Downgrade one step
alembic downgrade -1app/
__init__.py
config.py # Settings from environment variables
database.py # Engine, session factory, Base class
models/
__init__.py # Import all models for Alembic auto-detection
user.py
post.py
tag.py
repositories/
__init__.py
user_repository.py
post_repository.py
alembic/
env.py # Alembic configuration (async-aware)
versions/ # Migration files
alembic.ini # Alembic config filemapped_column(), Mapped[] type annotations, DeclarativeBase.create_async_engine and async_sessionmaker.app/models/. Import all models in app/models/__init__.py so Alembic detects them.Base.metadata.create_all() in production.pydantic-settings for typed configuration from environment variables.app/config.py)from pydantic_settings import BaseSettings
class Settings(BaseSettings):
database_url: str = "postgresql+asyncpg://app:secret@localhost:5432/myapp"
database_echo: bool = False
environment: str = "development"
model_config = {"env_prefix": "", "env_file": ".env"}
settings = Settings()app/database.py)from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
from app.config import settings
engine = create_async_engine(
settings.database_url,
echo=settings.database_echo,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
)
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
class Base(DeclarativeBase):
pass
async def get_session() -> AsyncSession:
"""Dependency for FastAPI or standalone usage."""
async with async_session_factory() as session:
yield sessionapp/models/user.py)from __future__ import annotations
import enum
from datetime import datetime
from sqlalchemy import String, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class UserRole(str, enum.Enum):
USER = "user"
ADMIN = "admin"
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
name: Mapped[str] = mapped_column(String(255))
password: Mapped[str] = mapped_column(String(255))
role: Mapped[UserRole] = mapped_column(default=UserRole.USER)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
server_default=func.now(), onupdate=func.now()
)
posts: Mapped[list[Post]] = relationship(back_populates="author", cascade="all, delete-orphan")
profile: Mapped[Profile | None] = relationship(back_populates="user", cascade="all, delete-orphan")
def __repr__(self) -> str:
return f"<User(id={self.id}, email={self.email!r})>"app/models/post.py)from __future__ import annotations
import enum
from datetime import datetime
from sqlalchemy import ForeignKey, String, Text, Table, Column, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
posts_tags = Table(
"posts_tags",
Base.metadata,
Column("post_id", ForeignKey("posts.id", ondelete="CASCADE"), primary_key=True),
Column("tag_id", ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True),
)
class PostStatus(str, enum.Enum):
DRAFT = "draft"
PUBLISHED = "published"
ARCHIVED = "archived"
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(500))
content: Mapped[str | None] = mapped_column(Text, nullable=True)
status: Mapped[PostStatus] = mapped_column(default=PostStatus.DRAFT, index=True)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), index=True)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
server_default=func.now(), onupdate=func.now()
)
author: Mapped[User] = relationship(back_populates="posts")
tags: Mapped[list[Tag]] = relationship(secondary=posts_tags, back_populates="posts")app/models/tag.py)from __future__ import annotations
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
from app.models.post import posts_tags
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100), unique=True)
posts: Mapped[list[Post]] = relationship(secondary=posts_tags, back_populates="tags")app/models/__init__.py)# Import all models so Alembic autogenerate detects them
from app.models.user import User, UserRole # noqa: F401
from app.models.post import Post, PostStatus # noqa: F401
from app.models.tag import Tag # noqa: F401app/repositories/user_repository.py)from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.user import User, UserRole
async def create_user(session: AsyncSession, *, email: str, name: str, password: str) -> User:
user = User(email=email, name=name, password=password)
session.add(user)
await session.flush()
return user
async def get_user_by_email(session: AsyncSession, email: str) -> User | None:
stmt = select(User).where(User.email == email).options(selectinload(User.profile))
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def get_user_with_posts(session: AsyncSession, user_id: int) -> User | None:
stmt = (
select(User)
.where(User.id == user_id)
.options(selectinload(User.posts))
)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def list_users(
session: AsyncSession,
*,
page: int = 1,
limit: int = 20,
role: UserRole | None = None,
) -> tuple[list[User], int]:
base = select(User)
count_stmt = select(func.count()).select_from(User)
if role:
base = base.where(User.role == role)
count_stmt = count_stmt.where(User.role == role)
stmt = base.order_by(User.created_at.desc()).offset((page - 1) * limit).limit(limit)
result = await session.execute(stmt)
users = list(result.scalars().all())
count_result = await session.execute(count_stmt)
total = count_result.scalar_one()
return users, totalalembic/env.py — key parts)import asyncio
from logging.config import fileConfig
from alembic import context
from sqlalchemy.ext.asyncio import create_async_engine
from app.config import settings
from app.database import Base
from app.models import * # noqa: F401, F403 — ensures models are registered
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
context.configure(
url=settings.database_url,
target_metadata=target_metadata,
literal_binds=True,
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_migrations_online() -> None:
engine = create_async_engine(settings.database_url)
async with engine.connect() as connection:
await connection.run_sync(do_run_migrations)
await engine.dispose()
if context.is_offline_mode():
run_migrations_offline()
else:
asyncio.run(run_migrations_online())# Generate migration from model changes
alembic revision --autogenerate -m "description of changes"
# Apply all pending migrations
alembic upgrade head
# Rollback one migration
alembic downgrade -1
# Show current migration version
alembic current
# Show migration history
alembic history
# Create empty migration (for custom SQL)
alembic revision -m "add custom index"
# Stamp database as current (skip running migrations)
alembic stamp headDepends(get_session) to inject AsyncSession into route handlers. Commit in the route or use a middleware.get_session in FastAPI app.dependency_overrides. Roll back transactions after each test with session.rollback().docker-compose-generator skill for the database service. Run alembic upgrade head in entrypoint.sqlalchemy.url in alembic.ini or override it in env.py from environment variables (recommended).181fcbc
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.