본문 바로가기
Programming/Python_Web

FastAPI - MySQL CRUD

by Wilkyway 2021. 9. 10.
반응형

 

1. 폴더구조

2. 라이브러리 설치

pip install fastapi, uvicorn
pip install mysql, sqlalchemy
pip install starlette

 

3. 코드

<db.py>

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session

user_name = "root"
user_pwd = ""
db_host = "127.0.0.1"
db_name = "project01"

DATABASE = 'mysql://%s:%s@%s/%s?charset=utf8' % (
    user_name,
    user_pwd,
    db_host,
    db_name,
)

ENGINE = create_engine(
    DATABASE,
    encoding="utf-8",
    echo=True
)

session = scoped_session(
    sessionmaker(
        autocommit = False,
        autoflush = False,
        bind = ENGINE
    )
)

Base = declarative_base()
Base.query = session.query_property()

<model.py>

from sqlalchemy import Column, Integer, String
from pydantic import BaseModel
from db import Base
from db import ENGINE

class UserTable(Base):
    __tablename__ = 'user'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(50), nullable=False)
    age = Column(Integer)

class User(BaseModel):
    id: int
    name: str
    age: int

 

<main.py>

from fastapi import FastAPI
from typing import List
from starlette.middleware.cors import CORSMiddleware

from db import session
from model import UserTable, User

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.get("/users")
async def read_users():
    users = session.query(UserTable).all()

    return users


@app.get("/users/{user_id}")
async def read_user(user_id: int):
    user = session.query(UserTable).filter(UserTable.id == user_id).first()
    return user


@app.post("/user")
async def create_users(name: str, age: int):
    user = UserTable()
    user.name = name
    user.age = age

    session.add(user)
    session.commit()

    return f"{name} created..."


@app.put("/users")
async def update_user(users: List[User]):
    for i in users:
        user = session.query(UserTable).filter(UserTable.id == i.id).first()
        user.name = i.name
        user.age = i.age
        session.commit()

    # users[0].name

    return f"{users[0].name} updated..."


@app.delete("/user")
async def delete_users(user_id: int):
    user = session.query(UserTable).filter(UserTable.id == user_id).delete()
    session.commit()

    return read_users
반응형

댓글