# vim:fileencoding=utf-8:ft=python:foldmethod=marker # statuspage/app/main.py import asyncio from typing import List, Optional from fastapi import FastAPI, Depends, Security, HTTPException from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from sqlalchemy.orm import Session from app import crud, models, schemas, auth from app import ultrametrics from app.database import SessionLocal, engine models.Base.metadata.create_all(bind=engine) oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/api/v1/auth/get_token') app = FastAPI(title="Statuspage API",description="This documentation describes the Statuspage API.",version="0.0.1") def get_db(): db = SessionLocal() try: yield db finally: db.close() def is_auth(token: str = Security(oauth2_scheme)): if not token: raise HTTPException(status_code=401, detail={"status": "error", "message": "Token not provided "}) return auth.validate(token) def get_usr(user_id: int, db: Session): db=db user_id=user_id db_user = crud.get_user(db=db, user_id=user_id) return db_user def get_srv(service_id: int, db: Session): db=db service_id=service_id db_service = crud.get_service(db=db, service_id=service_id) return db_service @app.get("/api/v1/ping") async def pong(): """Basic sanity check""" return {"ping": "pong!"} @app.get("/api/v1/service") async def read_services(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): """List all services""" return crud.get_services(db, skip=skip, limit=limit) @app.post("/api/v1/service", response_model=schemas.Service) async def create_service(service: schemas.ServiceCreate, db: Session = Depends(get_db), token: str = Security(is_auth)): """Service types: icmp = 0, http = 1""" if service.service_type not in [0, 1]: raise HTTPException(status_code=422, detail="Invalid service type provided") owner = get_usr(service.owner_id, db) if owner is None: raise HTTPException(status_code=422, detail="No such owner_id") return crud.create_service(db=db, service=service) @app.get("/api/v1/service/{service_id}", response_model=schemas.Service) async def read_service(service_id: int, db: Session = Depends(get_db)): db_service = get_srv(service_id, db) if db_service is None: raise HTTPException(status_code=404, detail="Service not found") return db_service @app.patch("/api/v1/service/{service_id}") async def update_service(service_id: int, service: schemas.ServiceUpdate, db: Session = Depends(get_db), token: str = Security(is_auth)): if ((service.name is None) and (service.owner_id is None) and (service.is_private is None) and (service.description is None) and (service.service_type is None) and (service.url is None) and (service.is_active is None)): raise HTTPException(status_code=400, detail="No data provided") db_service = get_srv(service_id, db) if db_service is None: raise HTTPException(status_code=422, detail="Unprocessable entity") crud.update_service(db=db, service_id=service_id, s=service) return {"update":"success"} @app.delete("/api/v1/service/{service_id}") async def delete_service(service_id: int, db: Session = Depends(get_db), token: str = Security(is_auth)): db_service = get_srv(service_id, db) if db_service is None: raise HTTPException(status_code=422, detail="Unprocessable entity") crud.del_service(db=db, service_id=service_id) return {"delete":"success"} @app.post("/api/v1/users", response_model=schemas.User) async def create_user(user: schemas.UserCreate, db: Session = Depends(get_db), token: str = Security(is_auth)): db_user = crud.get_user_by_username(db, name=user.name) if db_user: raise HTTPException(status_code=400, detail="Username already registered") return crud.create_user(db=db, user=user) @app.get("/api/v1/users", response_model=List[schemas.User]) async def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): """List all users""" return crud.get_users(db=db, skip=skip, limit=limit) @app.get("/api/v1/users/{user_id}", response_model=schemas.User) async def read_user(user_id: int, db: Session = Depends(get_db)): db_user = get_usr(user_id, db) if db_user is None: raise HTTPException(status_code=404, detail="User not found") return db_user @app.patch("/api/v1/users/{user_id}") async def update_user(user_id: int, user: schemas.UserUpdate, db: Session = Depends(get_db), token: str = Security(is_auth)): if ((user.name is None) and (user.password is None) and (user.full_name is None) and (user.is_active is None) and (user.is_superuser is None)): raise HTTPException(status_code=400, detail="No data provided") db_user = get_usr(user_id, db) if db_user is None: raise HTTPException(status_code=422, detail="Unprocessable entity") crud.update_user(db=db, user_id=user_id, user=user) return {"update":"success"} @app.delete("/api/v1/users/{user_id}") async def delete_user(user_id: int, db: Session = Depends(get_db), token: str = Security(is_auth)): db_user = get_usr(user_id, db) if db_user is None: raise HTTPException(status_code=422, detail="Unprocessable entity") crud.del_user(db=db, user_id=user_id) return {"delete":"success"} @app.post('/api/v1/auth/get_token',response_model=schemas.Token,response_description="Returns user access token",summary="Authenticate API user",description="Authenticate an API user and return a token for subsequent requests") async def get_token(form_data: OAuth2PasswordRequestForm = Depends()): a = auth.login(form_data.username, form_data.password) if a and a["status"] == "error": raise HTTPException(status_code=400, detail={"status": "error", "mesage": "username/password incorrect"}) return {"access_token": a["token"], "token_type": "bearer"} @app.post('/api/v1/auth/validate') async def validate_token(token: str): a = auth.validate(token=token) if a: return a @app.get("/api/v1/metrics", response_model=List[schemas.Metric]) async def get_metrics(skip: int = 0, limit: int = 90, db: Session = Depends(get_db)): return crud.get_metrics(db=db, skip=skip, limit=limit) @app.get("/api/v1/urls") async def get_urls(skip: int = 0, limit: int = 90, db: Session = Depends(get_db)): return crud.get_urls(db=db, skip=skip, limit=limit)