This commit is contained in:
quaduzi 2023-06-19 09:19:14 +07:00
parent b54c70bc8f
commit 2e72816f78
10 changed files with 470 additions and 119 deletions

Binary file not shown.

13
src/enums.py Normal file
View File

@ -0,0 +1,13 @@
from enum import Enum
class ResponseFormat(str, Enum):
json = "json"
text = "text"
html = "html"
class HeaderAccept(str, Enum):
json = "application/json"
text = "text/plain"
html = "text/html"

View File

@ -1,113 +1,12 @@
from datetime import datetime from fastapi import FastAPI
from typing import List, Union, Annotated
from fastapi import FastAPI, HTTPException, Depends, Request, status, Header from database import create_db_and_tables
from fastapi.responses import PlainTextResponse, HTMLResponse, FileResponse from routes import main_router
from fastapi.templating import Jinja2Templates
from sqlmodel import Session, select
from users.models import Users, UsersRead, UsersCreate, UsersUpdate, ResponseFormat, HeaderAccept
from database import create_db_and_tables, get_session
app = FastAPI() app = FastAPI()
app.include_router(router=main_router)
templates = Jinja2Templates(directory="templates")
@app.on_event("startup") @app.on_event("startup")
def on_startup(): def on_startup():
create_db_and_tables() create_db_and_tables()
@app.get("/users/", response_model=Union[List[UsersRead], str])
def get_users(request: Request, format: ResponseFormat = ResponseFormat.html, session: Session = Depends(get_session)):
results = session.exec(select(Users)).all()
if format == ResponseFormat.json:
return results
elif format == ResponseFormat.text:
return PlainTextResponse(content="\n".join(list(map(str, results))))
else:
return templates.TemplateResponse(
"users.html",
{
"request": request,
"users": list(map(lambda x: x.dict(), results))
}
)
@app.get("/users-by-header/", response_model=Union[List[UsersRead], str])
def get_users_by_header(
accept: Annotated[str, Header()] = f"Use '{HeaderAccept.json.value}' or '{HeaderAccept.text.value}'",
session: Session = Depends(get_session)):
results = session.exec(select(Users)).all()
if accept == HeaderAccept.json:
return results
elif accept == HeaderAccept.text:
return PlainTextResponse(content="\n".join(list(map(str, results))))
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Accept header schema not found use '{HeaderAccept.json.value}' or '{HeaderAccept.text.value}'"
)
@app.get("/users/{user_id}", response_model=Union[UsersRead, str])
def get_user(
user_id: int,
request: Request,
session: Session = Depends(get_session),
accept: Annotated[str, Header()] = HeaderAccept.json):
user = session.get(Users, user_id)
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
if HeaderAccept.html in accept:
return templates.TemplateResponse(
"user.html",
{
"request": request,
"user": user.dict()
}
)
else:
return user
@app.post("/users/", response_model=UsersRead, status_code=status.HTTP_201_CREATED)
def create_users(user: UsersCreate, session: Session = Depends(get_session)):
db_user = Users.from_orm(user)
session.add(db_user)
session.commit()
session.refresh(db_user)
return db_user
@app.patch("/users/{user_id}", response_model=UsersRead)
def update_user(user_id: int, user: UsersUpdate, session: Session = Depends(get_session)):
db_user = session.get(Users, user_id)
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
user_data = user.dict(exclude_unset=True)
for key, value in user_data.items():
setattr(db_user, key, value)
session.add(db_user)
session.commit()
session.refresh(db_user)
return db_user
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, session: Session = Depends(get_session)):
user = session.get(Users, user_id)
if not user:
return {"detail": "User not found"}
session.delete(user)
session.commit()
return {"detail": "User deleted"}
@app.get("/users/report/", response_class=FileResponse)
def generate_users_report():
date = datetime.now()
filename = f"user_report_{date.strftime('%Y-%m-%d')}.pdf"
return FileResponse(path="files/sample.pdf", media_type="application/pdf", filename=filename)

46
src/products/models.py Normal file
View File

@ -0,0 +1,46 @@
from sqlmodel import Field, SQLModel
class ProductsBase(SQLModel):
name: str
price: float
image: str
class Products(ProductsBase, table=True):
id: int | None = Field(default=None, primary_key=True)
class ProductsCreate(ProductsBase):
pass
class ProductsRead(ProductsBase):
id: int
class ProductsUpdate(SQLModel):
name: str | None = None
price: float | None = None
image: str | None = None
class CartItemBase(SQLModel):
product_id: int
class CartItemAdd(CartItemBase):
pass
class CartItem(CartItemBase):
count: int = 0
class CartItemRead(CartItem):
pass
class CartItemWithProduct(CartItemRead):
product: ProductsRead | None = None
total_price: float = 0

99
src/products/router.py Normal file
View File

@ -0,0 +1,99 @@
import json
from typing import List, Union, Annotated
from fastapi import Depends, Request, Header, APIRouter, Response, Cookie
from fastapi.templating import Jinja2Templates
from sqlmodel import Session, select
from database import get_session
from enums import HeaderAccept
from products.models import Products, ProductsRead, CartItem, CartItemRead, CartItemAdd, CartItemWithProduct
router = APIRouter(tags=["Products"])
templates = Jinja2Templates(directory="templates")
@router.get("/products/", response_model=Union[List[ProductsRead], str])
def get_products(
request: Request,
session: Session = Depends(get_session),
accept: Annotated[str | None, Header()] = HeaderAccept.json):
results = session.exec(select(Products)).all()
if HeaderAccept.html in accept:
return templates.TemplateResponse(
"products.html",
{
"request": request,
"products": list(map(lambda x: x.dict(), results))
}
)
else:
return results
@router.get("/cart/", response_model=Union[List[CartItemWithProduct], str])
def get_cart(
request: Request,
cart: Annotated[str | None, Cookie()] = None,
accept: Annotated[str | None, Header()] = HeaderAccept.json,
session: Session = Depends(get_session)):
cart_items_json = json.loads(cart) if cart else []
cart_items = list(map(lambda item: CartItemWithProduct(**item), cart_items_json))
cart_items_without_product = []
cart_items_with_product = []
for item in cart_items:
product = session.get(Products, item.product_id)
if product:
item.product = product
item.total_price = round(float(product.price) * int(item.count), 2)
cart_items_with_product.append(item)
else:
cart_items_without_product.append(item)
if HeaderAccept.html in accept:
return templates.TemplateResponse(
"cart.html",
{
"request": request,
"cart": list(map(lambda x: x.dict(), cart_items_with_product))
}
)
else:
return cart_items_with_product
@router.post("/add_cart", response_model=List[CartItemRead])
def add_cart(product: CartItemAdd, response: Response, cart: Annotated[str | None, Cookie()] = None):
cart_items_json = json.loads(cart) if cart else []
cart_items = list(map(lambda item: CartItem(**item), cart_items_json))
item_found = None
for item in cart_items:
if item.product_id == product.product_id:
item_found = item
if item_found:
item_found.count += 1
else:
new_item = CartItem(**product.dict(), count=1)
cart_items.append(new_item)
cart_items_json = list(map(lambda item: item.dict(), cart_items))
response.set_cookie('cart', json.dumps(cart_items_json))
return cart_items
@router.post("/remove_cart", response_model=List[CartItemRead])
def remove_cart(product: CartItemAdd, response: Response, cart: Annotated[str | None, Cookie()] = None):
cart_items_json = json.loads(cart) if cart else []
cart_items = list(map(lambda item: CartItem(**item), cart_items_json))
item_found = None
for item in cart_items:
if item.product_id == product.product_id:
item_found = item
if item_found:
if item_found.count <= 1:
cart_items.remove(item_found)
else:
item_found.count -= 1
cart_items_json = list(map(lambda item: item.dict(), cart_items))
response.set_cookie('cart', json.dumps(cart_items_json))
return cart_items

8
src/routes.py Normal file
View File

@ -0,0 +1,8 @@
from fastapi import APIRouter
from users.router import router as users_router
from products.router import router as products_router
main_router = APIRouter()
main_router.include_router(router=users_router)
main_router.include_router(router=products_router)

106
src/templates/cart.html Normal file
View File

@ -0,0 +1,106 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Users</title>
<style>
*, ::after, ::before {
box-sizing: border-box;
}
body{
font-family: Arial, Helvetica, sans-serif;
}
img {
object-fit: cover;
width: 120px;
height: 120px;
}
table{
border-collapse: collapse;
}
.table {
width: 100%;
margin-bottom: 1rem;
color: #212529;
}
.table thead th {
vertical-align: bottom;
border-bottom: 2px solid #dee2e6;
text-align: left;
}
.table td, .table th {
padding: 0.75rem;
vertical-align: top;
border-top: 1px solid #dee2e6;
}
</style>
</head>
<body>
<script defer>
function addToCart(productID) {
var xhr = new XMLHttpRequest();
xhr.open('POST', '/add_cart/')
xhr.setRequestHeader("Content-Type", "application/json");
xhr.send(JSON.stringify({"product_id": productID}));
xhr.onreadystatechange = function () {
if (xhr.readyState == XMLHttpRequest.DONE) {
location.replace("/cart/");
}
}
return false;
}
function removeFromCart(productID) {
var xhr = new XMLHttpRequest();
xhr.open('POST', '/remove_cart/')
xhr.setRequestHeader("Content-Type", "application/json");
xhr.send(JSON.stringify({"product_id": productID}));
xhr.onreadystatechange = function () {
if (xhr.readyState == XMLHttpRequest.DONE) {
location.replace("/cart/");
}
}
return false;
}
</script>
<a href="/products/">GO TO PRODUCTS</a>
<table class="table">
<thead>
<tr>
<th>Product ID</th>
<th>Name</th>
<th>Price</th>
<th>Count</th>
<th>Image</th>
<th></th>
</tr>
</thead>
<tbody>
{% for item in cart %}
<tr>
<td><a href="/products/{{item.product.id}}">{{item.product.id}}</a></td>
<td>{{item.product.name}}</td>
<td>{{item.total_price}}</td>
<td>{{item.count}}</td>
<td><img src="{{ item.product.image }}" alt="No image"></td>
<td>
<div>
<input type = "button" value = "Add" onclick="addToCart({{ item.product_id }})" />
<input type = "button" value = "Remove" onclick="removeFromCart({{ item.product_id }})" />
</div>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</body>
</html>

View File

@ -0,0 +1,85 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Users</title>
<style>
*, ::after, ::before {
box-sizing: border-box;
}
body{
font-family: Arial, Helvetica, sans-serif;
}
img {
object-fit: cover;
width: 120px;
height: 120px;
}
table{
border-collapse: collapse;
}
.table {
width: 100%;
margin-bottom: 1rem;
color: #212529;
}
.table thead th {
vertical-align: bottom;
border-bottom: 2px solid #dee2e6;
text-align: left;
}
.table td, .table th {
padding: 0.75rem;
vertical-align: top;
border-top: 1px solid #dee2e6;
}
</style>
</head>
<body>
<script defer>
function addToCart(productID) {
var xhr = new XMLHttpRequest();
xhr.open('POST', '/add_cart/')
xhr.setRequestHeader("Content-Type", "application/json");
xhr.send(JSON.stringify({"product_id": productID}));
xhr.onreadystatechange = function () {
if (xhr.readyState == XMLHttpRequest.DONE) {
location.replace("/products/");
}
}
return false;
}
</script>
<a href="/cart/">GO TO CART</a>
<table class="table">
<thead>
<tr>
<th>ID</th>
<th>Name</th>
<th>Price</th>
<th>Image</th>
<th></th>
</tr>
</thead>
<tbody>
{% for product in products %}
<tr>
<td><a href="/products/{{product.id}}">{{product.id}}</a></td>
<td>{{product.name}}</td>
<td>{{product.price}}</td>
<td><img src="{{ product.image }}" alt="No image"></td>
<td><input type = "button" value = "Add" onclick="addToCart({{ product.id }});" /></td>
</tr>
{% endfor %}
</tbody>
</table>
</body>
</html>

View File

@ -1,20 +1,6 @@
from enum import Enum
from sqlmodel import Field, SQLModel from sqlmodel import Field, SQLModel
class ResponseFormat(str, Enum):
json = "json"
text = "text"
html = "html"
class HeaderAccept(str, Enum):
json = "application/json"
text = "text/plain"
html = "text/html"
class UsersBase(SQLModel): class UsersBase(SQLModel):
first_name: str first_name: str
last_name: str last_name: str

109
src/users/router.py Normal file
View File

@ -0,0 +1,109 @@
from datetime import datetime
from typing import List, Union, Annotated
from fastapi import HTTPException, Depends, Request, status, Header, APIRouter
from fastapi.responses import PlainTextResponse, FileResponse
from fastapi.templating import Jinja2Templates
from sqlmodel import Session, select
from database import get_session
from enums import ResponseFormat, HeaderAccept
from users.models import Users, UsersRead, UsersCreate, UsersUpdate
router = APIRouter(tags=["Users"])
templates = Jinja2Templates(directory="templates")
@router.get("/users/", response_model=Union[List[UsersRead], str])
def get_users(request: Request, format: ResponseFormat = ResponseFormat.html, session: Session = Depends(get_session)):
results = session.exec(select(Users)).all()
if format == ResponseFormat.json:
return results
elif format == ResponseFormat.text:
return PlainTextResponse(content="\n".join(list(map(str, results))))
else:
return templates.TemplateResponse(
"users.html",
{
"request": request,
"users": list(map(lambda x: x.dict(), results))
}
)
@router.get("/users-by-header/", response_model=Union[List[UsersRead], str])
def get_users_by_header(
accept: Annotated[str, Header()] = f"Use '{HeaderAccept.json.value}' or '{HeaderAccept.text.value}'",
session: Session = Depends(get_session)):
results = session.exec(select(Users)).all()
if accept == HeaderAccept.json:
return results
elif accept == HeaderAccept.text:
return PlainTextResponse(content="\n".join(list(map(str, results))))
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Accept header schema not found use '{HeaderAccept.json.value}' or '{HeaderAccept.text.value}'"
)
@router.get("/users/{user_id}", response_model=Union[UsersRead, str])
def get_user(
user_id: int,
request: Request,
session: Session = Depends(get_session),
accept: Annotated[str, Header()] = HeaderAccept.json):
user = session.get(Users, user_id)
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
if HeaderAccept.html in accept:
return templates.TemplateResponse(
"user.html",
{
"request": request,
"user": user.dict()
}
)
else:
return user
@router.post("/users/", response_model=UsersRead, status_code=status.HTTP_201_CREATED)
def create_users(user: UsersCreate, session: Session = Depends(get_session)):
db_user = Users.from_orm(user)
session.add(db_user)
session.commit()
session.refresh(db_user)
return db_user
@router.patch("/users/{user_id}", response_model=UsersRead)
def update_user(user_id: int, user: UsersUpdate, session: Session = Depends(get_session)):
db_user = session.get(Users, user_id)
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
user_data = user.dict(exclude_unset=True)
for key, value in user_data.items():
setattr(db_user, key, value)
session.add(db_user)
session.commit()
session.refresh(db_user)
return db_user
@router.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, session: Session = Depends(get_session)):
user = session.get(Users, user_id)
if not user:
return {"detail": "User not found"}
session.delete(user)
session.commit()
return {"detail": "User deleted"}
@router.get("/users/report/", response_class=FileResponse)
def generate_users_report():
date = datetime.now()
filename = f"user_report_{date.strftime('%Y-%m-%d')}.pdf"
return FileResponse(path="files/sample.pdf", media_type="application/pdf", filename=filename)