73 lines
2.4 KiB
Python
73 lines
2.4 KiB
Python
import hashlib
|
||
from core.db import User, SessionLocal
|
||
from sqlalchemy.orm import Session
|
||
from jwt import encode, decode, ExpiredSignatureError, InvalidTokenError
|
||
from fastapi import HTTPException
|
||
from datetime import datetime, timedelta
|
||
|
||
SECRET_KEY = "SosttetHuhhhy"
|
||
ALGORITM = "HS256"
|
||
|
||
"""
|
||
Функция для генерации JWT токена.
|
||
При вводе аргументов user_id и access_level, функция возвращает JWT токен.
|
||
"""
|
||
def gen_jwt(user_id: int, access_level: int) -> str:
|
||
payload = {
|
||
"user_id": user_id,
|
||
"access_level": access_level,
|
||
"exp": datetime.utcnow() + timedelta(days=1)
|
||
}
|
||
token = encode(payload, SECRET_KEY, algorithm=ALGORITM)
|
||
return token
|
||
|
||
"""
|
||
Функция для расшифровки JWT токена.
|
||
Принимает токен и возвращает словарь с user_id и access_level.
|
||
"""
|
||
|
||
def decode_jwt(token: str) -> dict:
|
||
try:
|
||
payload = decode(token, SECRET_KEY, algorithms=[ALGORITM])
|
||
return {
|
||
"user_id": payload.get("user_id"),
|
||
"access_level": payload.get("access_level")
|
||
}
|
||
except ExpiredSignatureError:
|
||
raise HTTPException(status_code=401, detail="Срок действия токена истёк.")
|
||
except InvalidTokenError:
|
||
raise HTTPException(status_code=401, detail="Неверный токен.")
|
||
|
||
"""
|
||
Функция для хеширования пароля с использованием SHA-256.
|
||
При вводе аргумента password, функция возвращает его хеш.
|
||
"""
|
||
|
||
def hash_password(password: str) -> str:
|
||
sha256_hash = hashlib.sha256()
|
||
sha256_hash.update(password.encode('utf-8'))
|
||
return sha256_hash.hexdigest()
|
||
|
||
"""
|
||
Функция для проверки пароля.
|
||
При вводе аргументов stored_password и provided_password, функция
|
||
возвращает True, если хеши совпадают, и False в противном случае.
|
||
"""
|
||
|
||
def verify_password(stored_password: str, provided_password: str) -> bool:
|
||
return stored_password == hash_password(provided_password)
|
||
|
||
|
||
"""
|
||
Функция для определения уровня доступа пользователя.
|
||
"""
|
||
|
||
def is_admin(user_id: int, db: Session) -> bool:
|
||
user = db.query(User).filter(User.id == user_id).first()
|
||
if user.access_level ==1:
|
||
return False
|
||
elif user.access_level == 2:
|
||
return True
|
||
|
||
|