43 lines
1.2 KiB
Python
43 lines
1.2 KiB
Python
|
import bcrypt
|
||
|
import jwt
|
||
|
from fastapi import HTTPException
|
||
|
from datetime import datetime, timedelta
|
||
|
from app import crud
|
||
|
from app.database import SessionLocal
|
||
|
from app.settings import globals as settings
|
||
|
|
||
|
|
||
|
|
||
|
KEY = settings.JWT_SECRET
|
||
|
ALGORITHM = settings.JWT_ALGORITHM
|
||
|
EXPIRY = settings.JWT_EXPIRY
|
||
|
|
||
|
|
||
|
def login(username: str, password: str):
|
||
|
db = SessionLocal()
|
||
|
usr = crud.login_info(db=db, name=username,password=password)
|
||
|
if usr is None:
|
||
|
return {"status": "error", "mesage": "username/password incorrect"}
|
||
|
description = 'access_token'
|
||
|
token = jwt.encode({
|
||
|
'sub': username,
|
||
|
'iat': datetime.utcnow(),
|
||
|
'exp': datetime.utcnow() + timedelta(seconds=EXPIRY),
|
||
|
'des': description
|
||
|
},
|
||
|
KEY, ALGORITHM)
|
||
|
return {"status": "success", "token": token.decode('utf-8')}
|
||
|
|
||
|
def validate(token):
|
||
|
try:
|
||
|
data = jwt.decode(token, KEY)
|
||
|
except Exception as e:
|
||
|
if "expired" in str(e):
|
||
|
raise HTTPException(status_code=401, detail={"status": "error", "message": "Token expired"})
|
||
|
elif "Not enough segments" in str(e):
|
||
|
raise HTTPException(status_code=401, detail={"status": "error", "message": "Invalid token"})
|
||
|
else:
|
||
|
raise HTTPException(status_code=400, detail={"status": "error", "message": "Exception: " + str(e)})
|
||
|
return data
|
||
|
|