Skip to main content

왜 SQLAlchemy 2.0인가?

PostgreSQL + SQLAlchemy 2.0 실전 패턴

SQLAlchemy 1.x 시절, ORM을 쓰면서도 비동기 처리는 항상 골칫거리였습니다. FastAPI가 asyncio 기반으로 동작하는데 DB 쿼리만 동기식으로 블로킹되는 상황은 특히 고트래픽 서비스에서 치명적입니다. SQLAlchemy 2.0은 이 문제를 정면으로 해결했습니다. 새로운 AsyncSession, 통합된 2.0 쿼리 스타일, 더 명확해진 ORM 패턴이 FastAPI와 궁합이 완벽합니다.

이 글에서는 실제 프로덕션 프로젝트에서 적용한 SQLAlchemy 2.0 패턴을 정리합니다. 비동기 세션 팩토리 구성, Alembic 마이그레이션 전략, JSONB 컬럼 활용, 그리고 항상 개발자를 괴롭히는 N+1 문제 해결까지 코드와 함께 살펴봅니다.

비동기 세션 팩토리 구성

SQLAlchemy 2.0에서 비동기 DB 연결의 핵심은 create_async_engineAsyncSession입니다. 기존 1.x 방식과 달리 엔진 생성부터 쿼리 실행까지 전 과정이 비동기로 처리됩니다.

# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase

DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"

engine = create_async_engine(
    DATABASE_URL,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,  # 연결 상태 자동 체크
    echo=False,
)

AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # commit 후 객체 만료 방지
)

class Base(DeclarativeBase):
    pass

# FastAPI 의존성 주입용
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

expire_on_commit=False 설정은 FastAPI에서 매우 중요합니다. 기본값인 True로 두면 commit() 이후 ORM 객체의 속성에 접근할 때 추가 쿼리가 발생합니다. 비동기 컨텍스트에서는 이게 예상치 못한 에러를 만들어냅니다.

2.0 스타일 쿼리 작성법

SQLAlchemy 2.0은 select() 함수 기반의 새로운 쿼리 스타일을 도입했습니다. 1.x의 session.query(Model) 방식은 레거시로 분류됩니다. 새 스타일은 더 명시적이고 타입 힌팅 지원도 우수합니다.

# crud.py
from sqlalchemy import select, update, delete, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload

from .models import User, Post

async def get_user_with_posts(db: AsyncSession, user_id: int) -> User | None:
    # selectinload로 관계형 데이터 한 번에 로드 (N+1 방지)
    stmt = (
        select(User)
        .where(User.id == user_id)
        .where(User.is_active == True)
        .options(selectinload(User.posts))
    )
    result = await db.execute(stmt)
    return result.scalar_one_or_none()

async def get_users_paginated(
    db: AsyncSession,
    skip: int = 0,
    limit: int = 20,
    search: str | None = None,
) -> tuple[list[User], int]:
    base_query = select(User).where(User.is_active == True)
    
    if search:
        base_query = base_query.where(
            User.name.ilike(f"%{search}%")
        )
    
    # 전체 카운트 (페이지네이션용)
    count_stmt = select(func.count()).select_from(base_query.subquery())
    total = await db.scalar(count_stmt)
    
    # 실제 데이터
    data_stmt = base_query.offset(skip).limit(limit).order_by(User.created_at.desc())
    result = await db.execute(data_stmt)
    users = list(result.scalars().all())
    
    return users, total or 0

Alembic 마이그레이션 전략

Alembic은 SQLAlchemy의 공식 마이그레이션 도구입니다. 프로덕션 환경에서 DB 스키마를 안전하게 변경하려면 Alembic 없이는 매우 위험합니다. 비동기 엔진을 사용하는 경우 alembic.inienv.py 설정을 별도로 처리해야 합니다.

# alembic/env.py (비동기 버전)
import asyncio
from logging.config import fileConfig
from sqlalchemy.ext.asyncio import async_engine_from_config
from sqlalchemy import pool
from alembic import context

from app.database import Base
from app.models import *  # noqa: F401, F403 - 모든 모델 임포트 필요

config = context.config
fileConfig(config.config_file_name)
target_metadata = Base.metadata

def run_migrations_offline() -> None:
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url, target_metadata=target_metadata, literal_binds=True)
    with context.begin_transaction():
        context.run_migrations()

def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()

async def run_async_migrations() -> None:
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()

def run_migrations_online() -> None:
    asyncio.run(run_async_migrations())

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

마이그레이션 파일 생성은 alembic revision --autogenerate -m "add_user_profile_table" 명령으로 자동화할 수 있습니다. 단, --autogenerate가 감지하지 못하는 변경사항(인덱스 이름 변경, 외래키 제약조건 등)은 수동으로 추가해야 합니다. 마이그레이션 파일은 항상 코드 리뷰를 거친 뒤 적용하는 것을 권장합니다.

JSONB 컬럼 활용하기

PostgreSQL의 JSONB는 관계형 스키마의 엄격함과 NoSQL의 유연함을 동시에 제공합니다. SQLAlchemy에서 JSONB를 활용하면 메타데이터, 설정 값, 반정형 데이터를 별도 테이블 없이 효율적으로 저장할 수 있습니다.

# models.py - JSONB 컬럼 활용
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.sql import func
from datetime import datetime
from app.database import Base

class UserProfile(Base):
    __tablename__ = "user_profiles"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    user_id: Mapped[int] = mapped_column(Integer, index=True)
    extra_data: Mapped[dict] = mapped_column(JSONB, default=dict, nullable=False)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )

# JSONB 필드 쿼리
async def get_users_by_preference(db: AsyncSession, key: str, value: str):
    # JSONB 내부 값으로 필터링
    stmt = select(UserProfile).where(
        UserProfile.extra_data[key].astext == value
    )
    result = await db.execute(stmt)
    return list(result.scalars().all())

# JSONB 업데이트 (부분 업데이트)
async def update_user_extra(db: AsyncSession, user_id: int, updates: dict):
    stmt = (
        update(UserProfile)
        .where(UserProfile.user_id == user_id)
        .values(extra_data=UserProfile.extra_data.concat(updates))
        .returning(UserProfile)
    )
    result = await db.execute(stmt)
    return result.scalar_one_or_none()

N+1 문제 해결: selectinload vs joinedload

N+1 문제는 ORM을 사용하는 모든 프로젝트에서 언젠가 반드시 마주치게 되는 성능 함정입니다. 부모 데이터 1개 조회 후 자식 데이터를 N번 반복 조회하는 패턴으로, 데이터가 늘어날수록 쿼리 수가 폭발적으로 증가합니다.

SQLAlchemy 2.0에서는 selectinloadjoinedload 두 가지 주요 전략으로 이를 해결합니다.

  • selectinload: 별도 SELECT IN 쿼리로 관계 데이터 로드. 컬렉션(1:N) 관계에 권장. 쿼리 수 = 2회(항상)
  • joinedload: JOIN으로 한 번에 로드. N:1, 1:1 관계에 적합. 중복 행 발생 가능성 주의
# N+1 문제 해결 예시
from sqlalchemy.orm import selectinload, joinedload, contains_eager

# 방법 1: selectinload (1:N 관계에 적합)
async def get_posts_with_comments(db: AsyncSession, post_ids: list[int]):
    stmt = (
        select(Post)
        .where(Post.id.in_(post_ids))
        .options(
            selectinload(Post.comments),           # 댓글 목록
            selectinload(Post.tags),               # 태그 목록
            joinedload(Post.author),               # 작성자 (N:1)
        )
    )
    result = await db.execute(stmt)
    return list(result.unique().scalars().all())

# 방법 2: 명시적 JOIN + contains_eager (복잡한 필터링 시)
async def get_active_posts_with_active_author(db: AsyncSession):
    stmt = (
        select(Post)
        .join(Post.author)
        .where(Post.is_published == True)
        .where(User.is_active == True)
        .options(contains_eager(Post.author))
        .order_by(Post.created_at.desc())
        .limit(20)
    )
    result = await db.execute(stmt)
    return list(result.unique().scalars().all())

실전 팁: 쿼리 디버깅과 성능 모니터링

프로덕션에서 느린 쿼리를 찾아내려면 SQLAlchemy의 이벤트 리스너나 PostgreSQL의 pg_stat_statements를 활용하세요. 개발 환경에서는 echo=True로 엔진을 생성하면 실행되는 SQL을 모두 로그로 볼 수 있지만, 프로덕션에서는 절대 켜두면 안 됩니다.

Alembic 마이그레이션은 CI/CD 파이프라인에서 배포 전에 자동 실행하도록 구성하는 것이 좋습니다. alembic upgrade head 명령을 서버 시작 전에 실행하되, 마이그레이션 실패 시 배포를 중단하는 로직을 반드시 포함해야 합니다. 특히 컬럼 삭제 등 비가역적 변경은 별도 배포 사이클에서 단계적으로 진행하는 것을 권장합니다.

코드벤터는 FastAPI, SvelteKit 등 모던 기술 스택을 기반으로 다양한 웹 서비스를 설계하고 구축합니다. 글로벌 협력 네트워크를 통해 축적된 프로덕션 경험을 바탕으로, 성능과 유지보수성을 모두 갖춘 시스템을 만들어갑니다. 더 많은 실전 기술 노하우는 codeventer.com 블로그에서 계속 공유합니다.

댓글 남기기