This repository has been archived on 2020-08-14. You can view files and clone it, but cannot push or open issues or pull requests.
statuspage/app/crud.py

137 lines
4.5 KiB
Python

import bcrypt, time
from sqlalchemy.orm import Session, load_only
from app import models, schemas
User=models.User
Service=models.Service
Metric=models.Metric
def get_user(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
def del_user(db: Session, user_id: int):
db.query(User).filter(User.id == user_id).delete()
return db.commit()
def get_user_by_username(db: Session, name: str):
return db.query(User).filter(User.name == name).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
user.password = bcrypt.hashpw(user.password.encode(), bcrypt.gensalt())
db_user = User(**user.dict())
db_user.created_unix = time.time()
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def update_user(db: Session, user_id: int, user: schemas.UserUpdate):
updated_user = User(**user.dict(exclude_unset=True))
if (updated_user.password is not None):
updated_user.password = bcrypt.hashpw(user.password.encode(), bcrypt.gensalt())
db.query(User).filter(User.id==user_id).update({User.password: updated_user.password})
if (updated_user.name is not None):
db.query(User).filter(User.id==user_id).update({User.name: updated_user.name})
if (updated_user.full_name is not None):
db.query(User).filter(User.id==user_id).update({User.full_name: updated_user.full_name})
if (updated_user.is_active is not None):
db.query(User).filter(User.id==user_id).update({User.is_active: updated_user.is_active})
if (updated_user.is_superuser is not None):
db.query(User).filter(User.id==user_id).update({User.is_superuser: updated_user.is_superuser})
if (updated_user is not user):
db.query(User).filter(User.id==user_id).update({User.updated_unix: time.time()})
return db.commit()
def get_service(db: Session, service_id: int):
return db.query(Service).filter(Service.id == service_id).first()
def get_services(db: Session, skip: int = 0, limit: int = 100):
return db.query(Service).offset(skip).limit(limit).all()
def create_service(db: Session, service: schemas.ServiceCreate):
db_item = Service(**service.dict())
db_item.created_unix = time.time()
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
def del_service(db: Session, service_id: int):
db.query(Service).filter(Service.id == service_id).delete()
return db.commit()
def update_service(db: Session, service_id: int, s: schemas.ServiceUpdate):
updated_s = Service(**s.dict(exclude_unset=True))
if (updated_s.name is not None):
db.query(Service).filter(Service.id==service_id).update({Service.name: updated_s.name})
if (updated_s.owner_id is not None):
db.query(Service).filter(Service.id==service_id).update({Service.owner_id: updated_s.owner_id})
if (updated_s.is_private is not None):
db.query(Service).filter(Service.id==service_id).update({Service.is_private: updated_s.is_private})
if (updated_s.description is not None):
db.query(Service).filter(Service.id==service_id).update({Service.description: updated_s.description})
if (updated_s.service_type is not None):
db.query(Service).filter(Service.id==service_id).update({Service.service_type: updated_s.service_type})
if (updated_s.url is not None):
db.query(Service).filter(Service.id==service_id).update({Service.url: updated_s.url})
if (updated_s.is_active is not None):
db.query(Service).filter(Service.id==service_id).update({Service.is_active: updated_s.is_active})
if (updated_s is not s):
db.query(Service).filter(Service.id==service_id).update({Service.updated_unix: time.time()})
return db.commit()
def login_info(db: Session, name: str, password: str):
try:
usr = db.query(User).filter(User.name == name).first()
if bcrypt.hashpw(password.encode(), usr.password) == usr.password:
print("[*] match")
else:
print("[x] no match")
return None
except Exception as e:
print('exception', e)
return
return usr
def get_urls(db: Session):
# return db.query(Service).filter(Service.is_active == True).options(load_only(Service.url)).all()
return db.query(Service).filter(Service.is_active == True).all()
def get_metrics(db: Session, skip: int = 0, limit: int = 90):
return db.query(Metrics).offset(skip).limit(limit).all()
def create_metric(db: Session, me: schemas.MetricCreate):
db_metric = User(**me.dict())
if (me.http_response_code >=500):
db_metric.status=2
elif (me.http_response_code >=400):
db_metric.status=1
elif (me.http_response_code >=200):
db_metric.status=0
db.add(db_metric)
db.commit()
return db.commit()