calendar_test/src/main.py

396 lines
14 KiB
Python

from datetime import datetime, timedelta
from typing import List, Optional
import re
from fastapi import FastAPI, Query, Depends, HTTPException, status
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from sqlmodel import SQLModel, Field
from pydantic import BaseModel, ValidationError, validator
from routes import main_router
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build, Resource
from babel.dates import format_datetime
app = FastAPI()
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
app.mount("/static", StaticFiles(directory="static"), name="static")
app.include_router(router=main_router)
SCOPES = ['https://www.googleapis.com/auth/calendar']
FILE_PATH = 'token.json'
CALENDAR_ID = '926affbbac4e8e6701060f1fec6189162b0b6d246db13d84682749d647af9a1f@group.calendar.google.com'
creds = service_account.Credentials.from_service_account_file(filename=FILE_PATH, scopes=SCOPES)
TITLE_FREE = 'Запись свободна'
COLOR_FREE = '10'
TITLE_BUSY = 'Занято'
COLOR_BUSY = '6'
def get_calendar_service():
with build('calendar', 'v3', credentials=creds) as service:
print("Service created")
yield service
class CalendarDate(SQLModel):
dateTime: str = Field(alias="dateTime")
timeZone: str = Field(alias="timeZone")
date: str | None
class CalendarAttendees(SQLModel):
email: str
response_status: str | None = Field(default=None, alias="responseStatus")
class CalendarEvent(SQLModel):
id: str | None = None
start: CalendarDate
end: CalendarDate
colorId: str | None = None
summary: str | None = None
description: str | None = None
status: str | None = None
attendees: List[CalendarAttendees] | None = None
@app.get('/calendar_events', response_model=List[CalendarEvent], tags=['Calendar'])
def get_calendar_events(
lower_bound: datetime = None,
upper_bound: datetime = None,
service: Resource = Depends(get_calendar_service)
):
if lower_bound and upper_bound:
events_result = service.events().list(
calendarId=CALENDAR_ID,
orderBy="startTime",
singleEvents=True,
timeMin=lower_bound.isoformat(),
timeMax=upper_bound.isoformat()
).execute()
elif lower_bound:
events_result = service.events().list(
calendarId=CALENDAR_ID,
orderBy="startTime",
singleEvents=True,
timeMin=lower_bound.isoformat()
).execute()
elif upper_bound:
events_result = service.events().list(
calendarId=CALENDAR_ID,
orderBy="startTime",
singleEvents=True,
timeMax=upper_bound.isoformat()
).execute()
else:
events_result = service.events().list(
calendarId=CALENDAR_ID,
orderBy="startTime",
singleEvents=True
).execute()
events = events_result.get('items', [])
return events
@app.post('/calendar_events', tags=['Calendar'])
def create_calendar_event(calendar_event: CalendarEvent, service: Resource = Depends(get_calendar_service)):
# Call the Calendar API
now = datetime.utcnow().isoformat() + 'Z' # 'Z' indicates UTC time
print(calendar_event.dict(exclude_unset=True))
events_result = service.events().insert(calendarId=CALENDAR_ID, body=calendar_event.dict(exclude_unset=True)).execute()
return calendar_event
@app.post('/calendar_events/create_slots', response_model=List[CalendarEvent], tags=['Calendar'])
def create_calendar_slots(datetime_start: datetime, timezone: str, slot_length_minutes: int, count: int, service: Resource = Depends(get_calendar_service)):
# Call the Calendar API
events = []
batch = service.new_batch_http_request()
for i in range(count):
event = (CalendarEvent(
start=CalendarDate(
dateTime=(datetime_start + timedelta(minutes=slot_length_minutes*i)).isoformat(),
timeZone=timezone
),
end=CalendarDate(
dateTime=(datetime_start + timedelta(minutes=slot_length_minutes * i) + timedelta(minutes=slot_length_minutes)).isoformat(),
timeZone=timezone
),
colorId='10',
summary='Запись свободна'
))
batch.add(service.events().insert(calendarId=CALENDAR_ID, body=event.dict(exclude_unset=True)))
events.append(event)
# events_result = service.events().insert(calendarId=CALENDAR_ID, body=event.dict(exclude_unset=True)).execute()
batch.execute()
return events
@app.get('/calendar_events/free_slots', response_model=List[CalendarEvent], tags=['Calendar'])
def get_free_calendar_slots(
lower_bound: datetime = None,
upper_bound: datetime = None,
service: Resource = Depends(get_calendar_service)
):
events_dict = get_calendar_events(lower_bound=lower_bound, upper_bound=upper_bound, service=service)
events = list(map(lambda x: CalendarEvent(**x), events_dict))
free_slots = list(filter(lambda x: x.summary == TITLE_FREE if x.summary else False, events))
return free_slots
@app.post('/calendar_events/mark_busy', response_model=CalendarEvent, tags=['Calendar'])
def mark_busy_calendar_slot(
description: str | None = None,
lower_bound: datetime = None,
upper_bound: datetime = None,
service: Resource = Depends(get_calendar_service)
):
events_dict = get_calendar_events(lower_bound=lower_bound, upper_bound=upper_bound, service=service)
events = list(map(lambda x: CalendarEvent(**x), events_dict))
free_slots = list(filter(lambda x: x.summary == TITLE_FREE if x.summary else False, events))
if not free_slots:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Free slot not found")
event = free_slots[0]
event.summary = TITLE_BUSY
event.colorId = COLOR_BUSY
if description:
event.description = description
service.events().update(calendarId=CALENDAR_ID, eventId=event.id, body=event.dict(exclude_unset=True)).execute()
return event
@app.get('/calendar_events/{slot_id}', response_model=CalendarEvent, tags=['Calendar'])
def get_slot_by_id(
slot_id: str,
service: Resource = Depends(get_calendar_service)
):
try:
event_dict = service.events().get(
calendarId=CALENDAR_ID,
eventId=slot_id
).execute()
except Exception:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Slot not found")
event = CalendarEvent(**event_dict)
return event
@app.post('/calendar_events/{slot_id}/mark_busy', response_model=CalendarEvent, tags=['Calendar'])
def mark_busy_calendar_slot_by_id(
slot_id: str,
description: str | None = None,
service: Resource = Depends(get_calendar_service)
):
event = get_slot_by_id(slot_id=slot_id, service=service)
if event.summary != TITLE_FREE:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Slot is busy")
event.summary = TITLE_BUSY
event.colorId = COLOR_BUSY
if description:
event.description = description
service.events().update(calendarId=CALENDAR_ID, eventId=event.id, body=event.dict(exclude_unset=True)).execute()
return event
@app.post('/calendar_events/{slot_id}/mark_free', response_model=CalendarEvent, tags=['Calendar'])
def mark_free_calendar_slot_by_id(
slot_id: str,
service: Resource = Depends(get_calendar_service)
):
event = get_slot_by_id(slot_id=slot_id, service=service)
event.summary = TITLE_FREE
event.colorId = COLOR_FREE
event.description = " "
service.events().update(calendarId=CALENDAR_ID, eventId=event.id, body=event.dict(exclude_unset=True)).execute()
return event
@app.post('/calendar_events/mark_free', response_model=CalendarEvent, tags=['Calendar'])
def mark_free_calendar_slot(
lower_bound: datetime = None,
upper_bound: datetime = None,
service: Resource = Depends(get_calendar_service)
):
events_dict = get_calendar_events(lower_bound=lower_bound, upper_bound=upper_bound, service=service)
events = list(map(lambda x: CalendarEvent(**x), events_dict))
busy_slots = list(filter(lambda x: x.summary == TITLE_BUSY if x.summary else False, events))
if not busy_slots:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Busy slot not found")
event = busy_slots[0]
event.summary = TITLE_FREE
event.colorId = COLOR_FREE
service.events().update(calendarId=CALENDAR_ID, eventId=event.id, body=event.dict(exclude_unset=True)).execute()
return event
@app.post('/calendar_events/mark_free/batch', response_model=List[CalendarEvent], tags=['Calendar'])
def mark_free_calendar_slots(
lower_bound: datetime = None,
upper_bound: datetime = None,
service: Resource = Depends(get_calendar_service)
):
events_dict = get_calendar_events(lower_bound=lower_bound, upper_bound=upper_bound, service=service)
events = list(map(lambda x: CalendarEvent(**x), events_dict))
busy_slots = list(filter(lambda x: x.summary == TITLE_BUSY if x.summary else False, events))
if not busy_slots:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Busy slot not found")
batch = service.new_batch_http_request()
for event in busy_slots:
event.summary = TITLE_FREE
event.colorId = COLOR_FREE
event.description = ''
batch.add(service.events().update(calendarId=CALENDAR_ID, eventId=event.id, body=event.dict(exclude_unset=True)))
batch.execute()
return busy_slots
class FormatDateRequest(BaseModel):
date: datetime = Field(
description='Дата формата ISO, "YYYY-MM-DD HH:MM:SS" или "DD.YY.MM HH:MM:SS"'
)
class Config:
schema_extra = {
'examples': [
{
'date': "2023-07-10T15:33:08+07:00"
},
{
'date': "2023-07-10 15:33:08"
},
{
'date': "10.07.2023 15:33:08"
}
# '2023-07-10T15:33:08+07:00',
# '2023-07-10 15:33:08',
# '10.07.2023 15:33:08'
]
}
@validator('date', pre=True)
def validate_date(cls, val):
try:
if re.match(r'^(\d{4}-\d{2}-\d{2}) (\d{2}:\d{2}:\d{2})$', val):
return datetime.strptime(val, "%Y-%m-%d %H:%M:%S")
elif re.match(r'^(\d{2}.\d{2}.\d{4}) (\d{2}:\d{2}:\d{2})$', val):
return datetime.strptime(val, "%d.%m.%Y %H:%M:%S")
else:
return datetime.fromisoformat(val)
except:
raise ValueError('Wrong date format')
class FormatDateResponse(BaseModel):
iso_format: datetime = Field(schema_extra={'example': '2023-07-10T15:33:08+03:00'})
ymd_format: str = Field(schema_extra={'example': '2023-07-10 15:33:08'})
dmy_format: str = Field(schema_extra={'example': '10.07.2023 15:33:08'})
timestamp: int = Field(schema_extra={'example': 1688992388})
ymd_format_utc: str = Field(schema_extra={'example': '2023-07-10 12:33:08'})
dmy_format_utc: str = Field(schema_extra={'example': '10.07.2023 12:33:08'})
weekday: str = Field(schema_extra={'example': 'понедельник'})
weekday_short: str = Field(schema_extra={'example': 'пн'})
year: str = Field(schema_extra={'example': '2023'})
month: str = Field(schema_extra={'example': '07'})
day_of_month: str = Field(schema_extra={'example': '10'})
hour: str = Field(schema_extra={'example': '15'})
minute: str = Field(schema_extra={'example': '33'})
second: str = Field(schema_extra={'example': '08'})
timezone: str = Field(schema_extra={'example': 'UTC+03:00'})
now_delta_days: int = Field(schema_extra={'example': 0})
now_delta_word: str | None = Field(schema_extra={'example': 'сегодня'})
@app.post('/utils/date_format', response_model=FormatDateResponse, tags=['Utils'])
async def format_date(input_date: FormatDateRequest):
now = datetime.now()
timestamp = input_date.date.timestamp()
timezone = input_date.date.strftime('%Z')
now = datetime(now.year, now.month, now.day)
delta = datetime(input_date.date.year, input_date.date.month, input_date.date.day) - now
delta_word = None
if delta.days == 0:
delta_word = 'сегодня'
elif delta.days == 1:
delta_word = 'завтра'
elif delta.days == 2:
delta_word = 'послезавтра'
elif delta.days == -1:
delta_word = 'вчера'
elif delta.days == -2:
delta_word = 'позавчера'
return FormatDateResponse(
iso_format=input_date.date.isoformat(sep='T', timespec='seconds'),
ymd_format=input_date.date.strftime("%Y-%m-%d %H:%M:%S"),
ymd_format_utc=datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S"),
dmy_format=input_date.date.strftime('%d.%m.%Y %H:%M:%S'),
dmy_format_utc=datetime.fromtimestamp(timestamp).strftime('%d.%m.%Y %H:%M:%S'),
timestamp=timestamp,
weekday=format_datetime(input_date.date, 'EEEE', locale='ru_RU'),
weekday_short=format_datetime(input_date.date, 'E', locale='ru_RU'),
year=input_date.date.strftime('%Y'),
month=input_date.date.strftime('%m'),
day_of_month=input_date.date.strftime('%d'),
hour=input_date.date.strftime('%H'),
minute=input_date.date.strftime('%M'),
second=input_date.date.strftime('%S'),
timezone=timezone if timezone else 'UTC',
now_delta_days=delta.days,
now_delta_word=delta_word
)