새소식

이것저것 개발노트

[FastAPI] [Chapter 12] DB 조작 (CRUDs)

  • -

이전 포스팅에서 DB 접속 준비를 하고, ToDo 앱을 위한 DB 모델을 정의하였다.

이번 포스팅에서는 db의 읽기/쓰기 처리를 구현하고, 이를 api에 연결하여 동작을 확인해본다.

 

Task 리소스를 구성하는 CRUD에서 첫번째 C(Create)에 대해서 설명한다.

 

처음에는 데이터가 존재하지 않으므로 POST /tasks부터 작성한다.

 

CRUDs

라우터는 MVC(Model View Controller)의 컨트롤러에 해당한다. 

컨트롤러는 모델이나 뷰를 연결하기 때문에 비대해지기 쉽다. 

이를 피하기 위해 DB의 CRUD 조작 처리는 api/cruds.task.py에 작성한다. 

 

api/cruds/task.py

from sqlalchemy.orm import Session 

import api.models.task as task_model
import api.schemas.task as task_schema 

def create_task(db: Session, task_create:task_schema.TaskCreate) -> task_model.Task:            # 스키마가 task_create: task_schema.TaskCreate를 인수로 받는다. 
    task = task_model.Task(**task_create.dict())                                                # 이를 DB 모델인 task_model.Task로 변환한다.
    db.add(task)                                                                                # DB에 해당 task를 넣는다 
    db.commit()                                                                                 # DB에 커밋한다
    db.refresh(task)                                                                            # DB에 데이터를 바탕으로 Task 인스턴스인 task를 업데이트한다 (작성된 레코드의 ID를 가져옴)
    return task                                                                                 # 생성한 DB 모델을 반환한다.

 

 

# 스키가 task_create: task_schema.TaskCreate를 인수로 받는다. 
# 이를 DB 모델인 task_model.Task로 변환한다.
# DB에 해당 task를 넣는다 
# DB에 커밋한다
# DB에 데이터를 바탕으로 Task 인스턴스인 task를 업데이트한다 (작성된 레코드의 ID를 가져옴)
# 생성한 DB 모델을 반환한다.

 

라우터

앞에서 소개한 CRUD 정의를 이용하는 라우터는 아래와 같이 다시 작성한다.

 

api/routers/task.py

from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from api.db import get_db
import api.cruds.task as task_crud
 

import api.schemas.task as task_schema


router = APIRouter()

@router.get("/tasks", response_model=list[task_schema.Task])
async def list_tasks():
    return [task_schema.Task(id=1, title="첫번째 ToDo 작업")]

@router.post("/tasks", response_model=task_schema.TaskCreateResponse)
async def create_task(task_body: task_schema.TaskCreate, db:Session=Depends(get_db)):
    return task_crud.create_task(db, task_body)

@router.put("/tasks/{task_id}", response_model=task_schema.TaskCreateResponse)
async def update_task(task_id: int, task_body: task_schema.TaskCreate):
    return task_schema.TaskCreateResponse(id=task_id, **task_body.dict())

@router.delete("/tasks/{task_id}")
async def delete_task(task_id: int):
    return

 


Task 리소스를 구성하는 CRUD에서 두번째 R (Read)를 설명한다.

앞에 C(Create)를 통해 Task를 작성할 수 있게 되었으므로, 

이번에는 Task를 리스트로 받아오는 Read 엔드포인트를 생성한다.

 

ToDo 앱에는 Task에 대해 Done 모델이 정의되어 있는데, Read를 통해 각각을 개별적으로 가져오는 것이 번거롭다. 

이들을 join하여 ToDo 작업에 Done 플래그가 부여된 상태의 리스트를 가져오는 엔드포인트를 만들어 본다. 

 

CRUDs

join으로 CRUD 정의가 복잡해진다. 

api/cruds/task.py

from sqlalchemy import select
from sqlalchemy.engine import Result

def get_tasks_with_done(db: Session) -> list[tuple[int, str, bool]]:
    result: Result = db.execute(                                        # 1
        select(                                                         # 3 
            task_model.Task.id,
            task_model.Task.title,
            task_model.Done.id.isnot(None).label("done"),               # 4 
        ).outerjoin(task_model.Done)
    )
    
    return result.all()                                                 # 2

 

1. 이 Result 인스턴스는 아직 DB 요청의 결과 전체를 가지고 있지 않는다.

DB 레코드를 처리할 때 for 반복문 등으로 효율적으로 결과를 가져오기 위해 이터레이터로 정의되어 있다.

 

2. 이번에는 반복문으로 다룰만큼 무거운 처리가 없으므로 result.all() 호출로 모든 DB 레코드를 가져온다. 

 

3. select()로 필요한 필드를 지정하고, .outerjoin()으로 메인 DB 모델에 조인할 모델을 지정한다. 

또한 dones 테이블은 Tasks 테이블과 동일한 ID를 가지며, ToDo 작업이 완료되었을 때만 레코드가 존재한다고 설명했다. 

 

4. task_model.Done.id.isnot(None).label("done")에 의해 Done.id가 존재하면 done=True로,

존재하지 않으며 done=False로

join한 레코드를 반환한다. 

 

라우터

api/routers/task.py

@router.get("/tasks", response_model=list[task_schema.Task])
async def list_tasks(db:Session=Depends(get_db)):
    return task_crud.get_tasks_with_done(db)

 

 

동작확인

Create를 실행한 횟수만큼 ToDo 작업이 생성되고 있으며, 전체가 리스트로 반환된다.

또한 Tasks 테이블의 내용 뿐만 아니라 각 ToDo 작업의 완료 플래그인 done이 부여되어 있는 것을 알 수 있다.

아직 done 리소스의 엔드포인트를 정의하지 않았기 때문에 현재로서는 모두 false이다. 

 

GET /tasks 동작확인


Task 리소스를 구성하는 CRUD에서 세번째 U(Update)를 설명한다. 

 

Update도 Create와 거의 비슷하지만, 존재하는 Task에 대한 요청인지 여부를 확인하며,

존재하면 업데이트하고, 존재하지 않으면 404 오류를 반환하는 API를 만든다. 

def get_task(db:Session, task_id:int) -> task_model.Task | None:
    result: Result = db.execute(
        select(task_model.Task).filter(task_model.Task.id == task_id)
    )
    return result.scalars().first()

def update_task(
    db:Session, task_create:task_schema.TaskCreate, original:task_model.Task
) -> task_model.Task:
    original.title = task_create.title
    db.add(original)
    db.commit()
    db.refresh(original)
    return original

 

get_task() 함수에서는 .filter() 메서드를 사용하여 SELECT~WHERE의 SQL 쿼리에 의해 대상을 좁혀주고 있다. 

또한 Result는 select()에서 지정한 요소가 하나라도 튜플로 반환되므로, 튜플이 아닌 값으로 가져오는 과정이 별도로 필요하다. 

scalars() 메서드를 이용하면 결과의 각 행에서 가져올 요소를 1개로 좁혀서 값을 가져올 수 있다. 

 

update_task() 함수는 create_task() 함수와 거의 비슷하게 생겼다. original로 DB 모델을 받아 내용을 업데이트하여 반환하는 것이 유일한 차이점이다. 

 

라우터

앞에서 작성한 CRUD 정의를 이용하는 라우터를 아래와 같이 구현한다.

api/routers/task.py

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from api.db import get_db
import api.cruds.task as task_crud
 

import api.schemas.task as task_schema


router = APIRouter()

@router.get("/tasks", response_model=list[task_schema.Task])
async def list_tasks(db:Session=Depends(get_db)):
    return task_crud.get_tasks_with_done(db)

@router.post("/tasks", response_model=task_schema.TaskCreateResponse)
async def create_task(task_body: task_schema.TaskCreate, db:Session=Depends(get_db)):
    return task_crud.create_task(db, task_body)

@router.put("/tasks/{task_id}", response_model=task_schema.TaskCreateResponse)
async def update_task(
    task_id: int, task_body: task_schema.TaskCreate, db:Session=Depends(get_db)
    ):
    task = task_crud.get_task(db, task_id=task_id)
    if task is None:
        raise HTTPException(status_code=404, detail="Task not found")
    
    return task_crud.update_task(db, task_body, original=task)

@router.delete("/tasks/{task_id}")
async def delete_task(task_id: int):
    return

 

 

여기서 HTTPException은 임의의 HTTP 상태 코드를 인수로 받을 수 있는 Exception 클래스이다. 이번에는 404 Not Found를 지정하여 raise한다.

raise 문은 파이썬에서 에외를 명시적으로 발생시키는 데 사용된다.

예외는 프로그램 실행 중에 오류나 예상치 못한 상황을 나타내며, raise 문을 통해 개발자가 직접 원하는 시점에 예외를 발생시켜 예외 처리를 수행할 수 있다.

 

동작 확인

PUT /tasks/{task_id}

에서 task_id=1의 제목(title)을 변경해보자. 

 

task_id가 1인 title을 살짝 변경해주고 Execute 버튼을 눌러준다.

이제 GET /tasks로 작업 업데이트하고 확인을 해본다. 

 


 

Task 리소스를 구성하는 CRUD에서 네번째 D (Delete)를 설명한다. 

 

CRUDs

Delete의 인터페이스도 Update와 거의 비슷하다. 

아래 api/cruds/task.py에 delete_task 함수를 추가해준다. 

api/cruds/task.py

def delete_task(db:Session, original:task_model.Task) -> None:
    db.delete(original)
    db.commit()

 

라우터

앞에서 작성한 CRUD 정의를 이용하는 라우터는 아래와 같이 구현한다.

api/routers/task.py

@router.delete("/tasks/{task_id}", response_model=None)
async def delete_task(task_id: int, db:Session=Depends(get_db)):
    task = task_crud.get_task(db, task_id=task_id)
    if task is None:
        raise HTTPException(status_code=404, detail="Task not found")
    return task_crud.delete_task(db, original=task)

 

동작확인 

task_id=2를 삭제해본다. 

 

확인해 보면 id 2가 지워졌다.

 


 

Task 리소스와 마찬가지로 Done 리소스도 정의해 본다.

CRUD와 라우터를 함께 살펴보자. 

 

api/cruds/done.py 

import api.models.task as task_model

from sqlalchemy import select
from sqlalchemy.engine import Result
from sqlalchemy.orm import Session 

def get_done(db:Session, task_id:int) -> task_model.Done | None:
    result: Result = db.execute(
        select(task_model.Done).filter(task_model.Done.id == task_id)
    )
    return result.scalars().first()

def create_done(db:Session, task_id: int) -> task_model.Done:            
    done = task_model.Done(id=task_id)                                               
    db.add(task)                                                                               
    db.commit()                                                                              
    db.refresh(done)                                                                       
    return done  

def delete_done(db:Session, original: task_model.Done) -> None:                                                       
    db.delete(original)                                                                               
    db.commit()

 

api/routers/done.py 

from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.orm import Session

import api.schemas.done as done_schema
import api.cruds.done as done_crud
from api.db import get_db

router = APIRouter()

@router.put("/tasks/{task_id}/done", response_model=done_schema.DoneResponse)
async def mark_task_as_done(task_id: int, db:Session=Depends(get_db)):
    done = done_crud.get_done(db, task_id=task_id)
    if done is not None:
        rasie HTTPException(status_code=400, detail="Done already exists")
    return done_crud.create_done(db, task_id)

@router.delete("/tasks/{task_id}/done", response_model=None)
async def unmark_task_as_done(task_id: int, db:Session=Depends(get_db)):
    done = done_crud.get_done(db, task_id=task_id)
    if done is not None:
        rasie HTTPException(status_code=404, detail="Done not found")
    return done_crud.delete_done(db, original=done)

 

여기에 응답스키마 DoneResponse가 필요하므로 

api/schemas.done.py도 함께 작성해준다. 

api/schemas.done.py

from pydantic import BaseModel 

class DoneResponse(BaseModel):
    id: int
    
    class Config:
        orm_mode = True

 

조건에 따라 다음 동작을 수행한다.

- 완료 플래그가 설정되어 있지 않은 경우

  - PUT: 완료 플래그가 설정됨 

  - DELETE: 플래그가 없으므로 404 오류를 반환 

- 완료 플래그가 설정되어 있는 경우

   - PUT: 이미 플래그가 설정되어 있으므로 400 오류를 반환 

   - DELETE: 완료 플래그를 지움 

 

동작 확인 

Done 리소스의 Update 인터페이스에서 존재하는 작업 (Task)의 task_id를 입력해 실행한 뒤, Task 리소스의 Read 인터페이스를 실행하면 done 플래그가 변경된 것을 확인할 수 있다. 

 

PUT /tasks/{task_id}/done에서 

task_id가 1을 Done으로 하면 

 

 

GET /tasks에서 확인해보면 id가 1이고 done은 true가 된다. 


 

ToDo 앱을 동작하는데 필요한 파일을 모두 정의하였다. 최종적으로 아래와 같은 파일 구성이 되어야 한다. 

 

app
├── __init__.py
├── db.py
├── main.py
├── migrate_db.py
├── cruds
│   │── __init__.py
│   │── done.py
│   └── task.py
├── models
│   │── __init__.py
│   └── task.py
├── routers
│   │── __init__.py
│   │── done.py
│   └── task.py
└── schemas
    │── __init__.py
    │── done.py
    └── task.py

 

 

Swagger UI에서 모든 동작을 확인할 수 있다.

이대로도 문제없이 동작하지만, 다음 장에서는 FastAPI를 좀 더 빠르게 만들기 위해 지금까지 작성한 처리 코드를 비동기화 한다. 

또한 Swagger UI에서 동작을 확인하지 않아도 소스 코드 변경시 버그를 조기에 발견할 수 있도록 유닛 테스트를 작성한다. 

 

 

 

 

Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.