diff --git a/src/database.sqlite b/src/database.sqlite index feaa63c..5b048c5 100644 Binary files a/src/database.sqlite and b/src/database.sqlite differ diff --git a/src/enums.py b/src/enums.py new file mode 100644 index 0000000..146a843 --- /dev/null +++ b/src/enums.py @@ -0,0 +1,13 @@ +from enum import Enum + + +class ResponseFormat(str, Enum): + json = "json" + text = "text" + html = "html" + + +class HeaderAccept(str, Enum): + json = "application/json" + text = "text/plain" + html = "text/html" diff --git a/src/main.py b/src/main.py index cf83fe4..7cd32d6 100644 --- a/src/main.py +++ b/src/main.py @@ -1,113 +1,12 @@ -from datetime import datetime -from typing import List, Union, Annotated +from fastapi import FastAPI -from fastapi import FastAPI, HTTPException, Depends, Request, status, Header -from fastapi.responses import PlainTextResponse, HTMLResponse, FileResponse -from fastapi.templating import Jinja2Templates -from sqlmodel import Session, select - -from users.models import Users, UsersRead, UsersCreate, UsersUpdate, ResponseFormat, HeaderAccept -from database import create_db_and_tables, get_session +from database import create_db_and_tables +from routes import main_router app = FastAPI() - -templates = Jinja2Templates(directory="templates") +app.include_router(router=main_router) @app.on_event("startup") def on_startup(): create_db_and_tables() - - -@app.get("/users/", response_model=Union[List[UsersRead], str]) -def get_users(request: Request, format: ResponseFormat = ResponseFormat.html, session: Session = Depends(get_session)): - results = session.exec(select(Users)).all() - if format == ResponseFormat.json: - return results - elif format == ResponseFormat.text: - return PlainTextResponse(content="\n".join(list(map(str, results)))) - else: - return templates.TemplateResponse( - "users.html", - { - "request": request, - "users": list(map(lambda x: x.dict(), results)) - } - ) - - -@app.get("/users-by-header/", response_model=Union[List[UsersRead], str]) -def get_users_by_header( - accept: Annotated[str, Header()] = f"Use '{HeaderAccept.json.value}' or '{HeaderAccept.text.value}'", - session: Session = Depends(get_session)): - results = session.exec(select(Users)).all() - if accept == HeaderAccept.json: - return results - elif accept == HeaderAccept.text: - return PlainTextResponse(content="\n".join(list(map(str, results)))) - else: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Accept header schema not found use '{HeaderAccept.json.value}' or '{HeaderAccept.text.value}'" - ) - - -@app.get("/users/{user_id}", response_model=Union[UsersRead, str]) -def get_user( - user_id: int, - request: Request, - session: Session = Depends(get_session), - accept: Annotated[str, Header()] = HeaderAccept.json): - user = session.get(Users, user_id) - if not user: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") - if HeaderAccept.html in accept: - return templates.TemplateResponse( - "user.html", - { - "request": request, - "user": user.dict() - } - ) - else: - return user - - -@app.post("/users/", response_model=UsersRead, status_code=status.HTTP_201_CREATED) -def create_users(user: UsersCreate, session: Session = Depends(get_session)): - db_user = Users.from_orm(user) - session.add(db_user) - session.commit() - session.refresh(db_user) - return db_user - - -@app.patch("/users/{user_id}", response_model=UsersRead) -def update_user(user_id: int, user: UsersUpdate, session: Session = Depends(get_session)): - db_user = session.get(Users, user_id) - if not user: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") - user_data = user.dict(exclude_unset=True) - for key, value in user_data.items(): - setattr(db_user, key, value) - session.add(db_user) - session.commit() - session.refresh(db_user) - return db_user - - -@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT) -def delete_user(user_id: int, session: Session = Depends(get_session)): - user = session.get(Users, user_id) - if not user: - return {"detail": "User not found"} - session.delete(user) - session.commit() - return {"detail": "User deleted"} - - -@app.get("/users/report/", response_class=FileResponse) -def generate_users_report(): - date = datetime.now() - filename = f"user_report_{date.strftime('%Y-%m-%d')}.pdf" - return FileResponse(path="files/sample.pdf", media_type="application/pdf", filename=filename) diff --git a/src/products/models.py b/src/products/models.py new file mode 100644 index 0000000..88eaa3c --- /dev/null +++ b/src/products/models.py @@ -0,0 +1,46 @@ +from sqlmodel import Field, SQLModel + + +class ProductsBase(SQLModel): + name: str + price: float + image: str + + +class Products(ProductsBase, table=True): + id: int | None = Field(default=None, primary_key=True) + + +class ProductsCreate(ProductsBase): + pass + + +class ProductsRead(ProductsBase): + id: int + + +class ProductsUpdate(SQLModel): + name: str | None = None + price: float | None = None + image: str | None = None + + +class CartItemBase(SQLModel): + product_id: int + + +class CartItemAdd(CartItemBase): + pass + + +class CartItem(CartItemBase): + count: int = 0 + + +class CartItemRead(CartItem): + pass + + +class CartItemWithProduct(CartItemRead): + product: ProductsRead | None = None + total_price: float = 0 diff --git a/src/products/router.py b/src/products/router.py new file mode 100644 index 0000000..606b5c1 --- /dev/null +++ b/src/products/router.py @@ -0,0 +1,99 @@ +import json +from typing import List, Union, Annotated + +from fastapi import Depends, Request, Header, APIRouter, Response, Cookie +from fastapi.templating import Jinja2Templates +from sqlmodel import Session, select + +from database import get_session +from enums import HeaderAccept +from products.models import Products, ProductsRead, CartItem, CartItemRead, CartItemAdd, CartItemWithProduct + +router = APIRouter(tags=["Products"]) + +templates = Jinja2Templates(directory="templates") + + +@router.get("/products/", response_model=Union[List[ProductsRead], str]) +def get_products( + request: Request, + session: Session = Depends(get_session), + accept: Annotated[str | None, Header()] = HeaderAccept.json): + results = session.exec(select(Products)).all() + if HeaderAccept.html in accept: + return templates.TemplateResponse( + "products.html", + { + "request": request, + "products": list(map(lambda x: x.dict(), results)) + } + ) + else: + return results + + +@router.get("/cart/", response_model=Union[List[CartItemWithProduct], str]) +def get_cart( + request: Request, + cart: Annotated[str | None, Cookie()] = None, + accept: Annotated[str | None, Header()] = HeaderAccept.json, + session: Session = Depends(get_session)): + cart_items_json = json.loads(cart) if cart else [] + cart_items = list(map(lambda item: CartItemWithProduct(**item), cart_items_json)) + cart_items_without_product = [] + cart_items_with_product = [] + for item in cart_items: + product = session.get(Products, item.product_id) + if product: + item.product = product + item.total_price = round(float(product.price) * int(item.count), 2) + cart_items_with_product.append(item) + else: + cart_items_without_product.append(item) + + if HeaderAccept.html in accept: + return templates.TemplateResponse( + "cart.html", + { + "request": request, + "cart": list(map(lambda x: x.dict(), cart_items_with_product)) + } + ) + else: + return cart_items_with_product + + +@router.post("/add_cart", response_model=List[CartItemRead]) +def add_cart(product: CartItemAdd, response: Response, cart: Annotated[str | None, Cookie()] = None): + cart_items_json = json.loads(cart) if cart else [] + cart_items = list(map(lambda item: CartItem(**item), cart_items_json)) + item_found = None + for item in cart_items: + if item.product_id == product.product_id: + item_found = item + if item_found: + item_found.count += 1 + else: + new_item = CartItem(**product.dict(), count=1) + cart_items.append(new_item) + cart_items_json = list(map(lambda item: item.dict(), cart_items)) + response.set_cookie('cart', json.dumps(cart_items_json)) + return cart_items + + +@router.post("/remove_cart", response_model=List[CartItemRead]) +def remove_cart(product: CartItemAdd, response: Response, cart: Annotated[str | None, Cookie()] = None): + cart_items_json = json.loads(cart) if cart else [] + cart_items = list(map(lambda item: CartItem(**item), cart_items_json)) + item_found = None + for item in cart_items: + if item.product_id == product.product_id: + item_found = item + if item_found: + if item_found.count <= 1: + cart_items.remove(item_found) + else: + item_found.count -= 1 + cart_items_json = list(map(lambda item: item.dict(), cart_items)) + response.set_cookie('cart', json.dumps(cart_items_json)) + return cart_items diff --git a/src/routes.py b/src/routes.py new file mode 100644 index 0000000..b773b9b --- /dev/null +++ b/src/routes.py @@ -0,0 +1,8 @@ +from fastapi import APIRouter + +from users.router import router as users_router +from products.router import router as products_router + +main_router = APIRouter() +main_router.include_router(router=users_router) +main_router.include_router(router=products_router) diff --git a/src/templates/cart.html b/src/templates/cart.html new file mode 100644 index 0000000..75adaeb --- /dev/null +++ b/src/templates/cart.html @@ -0,0 +1,106 @@ + + + + + + + Users + + + + + + GO TO PRODUCTS + + + + + + + + + + + + + {% for item in cart %} + + + + + + + + + {% endfor %} + +
Product IDNamePriceCountImage
{{item.product.id}}{{item.product.name}}{{item.total_price}}{{item.count}}No image +
+ + +
+
+ + diff --git a/src/templates/products.html b/src/templates/products.html new file mode 100644 index 0000000..b21861e --- /dev/null +++ b/src/templates/products.html @@ -0,0 +1,85 @@ + + + + + + + Users + + + + + + GO TO CART + + + + + + + + + + + + + {% for product in products %} + + + + + + + + {% endfor %} + +
IDNamePriceImage
{{product.id}}{{product.name}}{{product.price}}No image
+ + diff --git a/src/users/models.py b/src/users/models.py index 323bfe4..0802773 100644 --- a/src/users/models.py +++ b/src/users/models.py @@ -1,20 +1,6 @@ -from enum import Enum - from sqlmodel import Field, SQLModel -class ResponseFormat(str, Enum): - json = "json" - text = "text" - html = "html" - - -class HeaderAccept(str, Enum): - json = "application/json" - text = "text/plain" - html = "text/html" - - class UsersBase(SQLModel): first_name: str last_name: str diff --git a/src/users/router.py b/src/users/router.py new file mode 100644 index 0000000..4e87a40 --- /dev/null +++ b/src/users/router.py @@ -0,0 +1,109 @@ +from datetime import datetime +from typing import List, Union, Annotated + +from fastapi import HTTPException, Depends, Request, status, Header, APIRouter +from fastapi.responses import PlainTextResponse, FileResponse +from fastapi.templating import Jinja2Templates +from sqlmodel import Session, select + +from database import get_session +from enums import ResponseFormat, HeaderAccept +from users.models import Users, UsersRead, UsersCreate, UsersUpdate + +router = APIRouter(tags=["Users"]) + +templates = Jinja2Templates(directory="templates") + + +@router.get("/users/", response_model=Union[List[UsersRead], str]) +def get_users(request: Request, format: ResponseFormat = ResponseFormat.html, session: Session = Depends(get_session)): + results = session.exec(select(Users)).all() + if format == ResponseFormat.json: + return results + elif format == ResponseFormat.text: + return PlainTextResponse(content="\n".join(list(map(str, results)))) + else: + return templates.TemplateResponse( + "users.html", + { + "request": request, + "users": list(map(lambda x: x.dict(), results)) + } + ) + + +@router.get("/users-by-header/", response_model=Union[List[UsersRead], str]) +def get_users_by_header( + accept: Annotated[str, Header()] = f"Use '{HeaderAccept.json.value}' or '{HeaderAccept.text.value}'", + session: Session = Depends(get_session)): + results = session.exec(select(Users)).all() + if accept == HeaderAccept.json: + return results + elif accept == HeaderAccept.text: + return PlainTextResponse(content="\n".join(list(map(str, results)))) + else: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Accept header schema not found use '{HeaderAccept.json.value}' or '{HeaderAccept.text.value}'" + ) + + +@router.get("/users/{user_id}", response_model=Union[UsersRead, str]) +def get_user( + user_id: int, + request: Request, + session: Session = Depends(get_session), + accept: Annotated[str, Header()] = HeaderAccept.json): + user = session.get(Users, user_id) + if not user: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") + if HeaderAccept.html in accept: + return templates.TemplateResponse( + "user.html", + { + "request": request, + "user": user.dict() + } + ) + else: + return user + + +@router.post("/users/", response_model=UsersRead, status_code=status.HTTP_201_CREATED) +def create_users(user: UsersCreate, session: Session = Depends(get_session)): + db_user = Users.from_orm(user) + session.add(db_user) + session.commit() + session.refresh(db_user) + return db_user + + +@router.patch("/users/{user_id}", response_model=UsersRead) +def update_user(user_id: int, user: UsersUpdate, session: Session = Depends(get_session)): + db_user = session.get(Users, user_id) + if not user: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") + user_data = user.dict(exclude_unset=True) + for key, value in user_data.items(): + setattr(db_user, key, value) + session.add(db_user) + session.commit() + session.refresh(db_user) + return db_user + + +@router.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT) +def delete_user(user_id: int, session: Session = Depends(get_session)): + user = session.get(Users, user_id) + if not user: + return {"detail": "User not found"} + session.delete(user) + session.commit() + return {"detail": "User deleted"} + + +@router.get("/users/report/", response_class=FileResponse) +def generate_users_report(): + date = datetime.now() + filename = f"user_report_{date.strftime('%Y-%m-%d')}.pdf" + return FileResponse(path="files/sample.pdf", media_type="application/pdf", filename=filename)