This repository has been archived on 2020-08-14. You can view files and clone it, but cannot push or open issues or pull requests.
statuspage/app/crud.py
surtur 927bdd03ce
feat: added JWT-based authentication; introduced config file
* authorization [WIP]
* refactored several sections
* used dict(exclude_unset=True)
* global values are now sourced from a config file (statuspagerc)
2020-08-11 11:48:46 +02:00

112 lines
3.7 KiB
Python

import bcrypt, time
from sqlalchemy.orm import Session
from app import models, schemas
User=models.User
Service=models.Service
def get_user(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
def del_user(db: Session, user_id: int):
db.query(User).filter(User.id == user_id).delete()
return db.commit()
def get_user_by_username(db: Session, name: str):
return db.query(User).filter(User.name == name).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
user.password = bcrypt.hashpw(user.password.encode(), bcrypt.gensalt())
db_user = User(**user.dict())
db_user.created_unix = time.time()
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def update_user(db: Session, user_id: int, user: schemas.UserUpdate):
updated_user = User(**user.dict(exclude_unset=True))
if (updated_user.password is not None):
updated_user.password = bcrypt.hashpw(user.password.encode(), bcrypt.gensalt())
db.query(User).filter(User.id==user_id).update({User.password: updated_user.password})
if (updated_user.name is not None):
db.query(User).filter(User.id==user_id).update({User.name: updated_user.name})
if (updated_user.full_name is not None):
db.query(User).filter(User.id==user_id).update({User.full_name: updated_user.full_name})
if (updated_user.is_active is not None):
db.query(User).filter(User.id==user_id).update({User.is_active: updated_user.is_active})
if (updated_user is not user):
db.query(User).filter(User.id==user_id).update({User.updated_unix: time.time()})
return db.commit()
def get_service(db: Session, service_id: int):
return db.query(Service).filter(Service.id == service_id).first()
def get_services(db: Session, skip: int = 0, limit: int = 100):
return db.query(Service).offset(skip).limit(limit).all()
def create_service(db: Session, service: schemas.ServiceCreate):
db_item = Service(**service.dict())
db_item.created_unix = time.time()
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
def del_service(db: Session, service_id: int):
db.query(Service).filter(Service.id == service_id).delete()
return db.commit()
def update_service(db: Session, service_id: int, s: schemas.ServiceUpdate):
updated_s = Service(**s.dict(exclude_unset=True))
if (updated_s.name is not None):
db.query(Service).filter(Service.id==service_id).update({Service.name: updated_s.name})
if (updated_s.owner_id is not None):
db.query(Service).filter(Service.id==service_id).update({Service.owner_id: updated_s.owner_id})
if (updated_s.is_private is not None):
db.query(Service).filter(Service.id==service_id).update({Service.is_private: updated_s.is_private})
if (updated_s.description is not None):
db.query(Service).filter(Service.id==service_id).update({Service.description: updated_s.description})
if (updated_s.service_type is not None):
db.query(Service).filter(Service.id==service_id).update({Service.service_type: updated_s.service_type})
if (updated_s.url is not None):
db.query(Service).filter(Service.id==service_id).update({Service.url: updated_s.url})
if (updated_s.is_active is not None):
db.query(Service).filter(Service.id==service_id).update({Service.is_active: updated_s.is_active})
if (updated_s is not s):
db.query(Service).filter(Service.id==service_id).update({Service.updated_unix: time.time()})
return db.commit()
def login_info(db: Session, name: str, password: str):
try:
usr = db.query(User).filter(User.name == name).first()
if bcrypt.hashpw(password.encode(), usr.password) == usr.password:
print("[*] match")
else:
print("[x] no match")
return None
except Exception as e:
print('exception', e)
return
return usr