calendar_test/src/main.py

114 lines
3.9 KiB
Python

from datetime import datetime
from typing import List, Union, Annotated
from fastapi import FastAPI, HTTPException, Depends, Request, status, Header
from fastapi.responses import PlainTextResponse, HTMLResponse, FileResponse
from fastapi.templating import Jinja2Templates
from sqlmodel import Session, select
from users.models import Users, UsersRead, UsersCreate, UsersUpdate, ResponseFormat, HeaderAccept
from database import create_db_and_tables, get_session
app = FastAPI()
templates = Jinja2Templates(directory="templates")
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.get("/users/", response_model=Union[List[UsersRead], str])
def get_users(request: Request, format: ResponseFormat = ResponseFormat.html, session: Session = Depends(get_session)):
results = session.exec(select(Users)).all()
if format == ResponseFormat.json:
return results
elif format == ResponseFormat.text:
return PlainTextResponse(content="\n".join(list(map(str, results))))
else:
return templates.TemplateResponse(
"users.html",
{
"request": request,
"users": list(map(lambda x: x.dict(), results))
}
)
@app.get("/users-by-header/", response_model=Union[List[UsersRead], str])
def get_users_by_header(
accept: Annotated[str, Header()] = f"Use '{HeaderAccept.json.value}' or '{HeaderAccept.text.value}'",
session: Session = Depends(get_session)):
results = session.exec(select(Users)).all()
if accept == HeaderAccept.json:
return results
elif accept == HeaderAccept.text:
return PlainTextResponse(content="\n".join(list(map(str, results))))
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Accept header schema not found use '{HeaderAccept.json.value}' or '{HeaderAccept.text.value}'"
)
@app.get("/users/{user_id}", response_model=Union[UsersRead, str])
def get_user(
user_id: int,
request: Request,
session: Session = Depends(get_session),
accept: Annotated[str, Header()] = HeaderAccept.json):
user = session.get(Users, user_id)
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
if HeaderAccept.html in accept:
return templates.TemplateResponse(
"user.html",
{
"request": request,
"user": user.dict()
}
)
else:
return user
@app.post("/users/", response_model=UsersRead, status_code=status.HTTP_201_CREATED)
def create_users(user: UsersCreate, session: Session = Depends(get_session)):
db_user = Users.from_orm(user)
session.add(db_user)
session.commit()
session.refresh(db_user)
return db_user
@app.patch("/users/{user_id}", response_model=UsersRead)
def update_user(user_id: int, user: UsersUpdate, session: Session = Depends(get_session)):
db_user = session.get(Users, user_id)
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
user_data = user.dict(exclude_unset=True)
for key, value in user_data.items():
setattr(db_user, key, value)
session.add(db_user)
session.commit()
session.refresh(db_user)
return db_user
@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, session: Session = Depends(get_session)):
user = session.get(Users, user_id)
if not user:
return {"detail": "User not found"}
session.delete(user)
session.commit()
return {"detail": "User deleted"}
@app.get("/users/report/", response_class=FileResponse)
def generate_users_report():
date = datetime.now()
filename = f"user_report_{date.strftime('%Y-%m-%d')}.pdf"
return FileResponse(path="files/sample.pdf", media_type="application/pdf", filename=filename)