2025-06-10 16:38:27 +03:00

464 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, HTTPException, Depends, APIRouter, Response, Header, File, UploadFile, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, EmailStr
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from urllib.parse import quote, unquote
from core.db import get_db, User
from core.crypt import hash_password, verify_password, gen_jwt, decode_jwt
from core.file import get_upload_dir, save_file, generate_file_url
import os
from pathlib import Path
from datetime import datetime
router = APIRouter()
security = HTTPBearer()
class RegisterRequest(BaseModel):
login: str
password: str
email: EmailStr
last_name: str
first_name: str
middle_name: str | None = None
@router.post("/register")
def register_user(user_data: RegisterRequest, db: Session = Depends(get_db)):
existing_user = db.query(User).filter(User.login == user_data.login).first()
if existing_user:
raise HTTPException(status_code=400, detail="Пользователь с таким логином уже существует.")
existing_email = db.query(User).filter(User.email == user_data.email).first()
if existing_email:
raise HTTPException(status_code=400, detail="Пользователь с такой почтой уже существует.")
hashed_password = hash_password(user_data.password)
new_user = User(
login=user_data.login,
password=hashed_password,
email=user_data.email,
last_name=user_data.last_name,
first_name=user_data.first_name,
middle_name=user_data.middle_name,
access_level=1,
)
try:
db.add(new_user)
db.commit()
db.refresh(new_user)
except IntegrityError:
db.rollback()
raise HTTPException(status_code=500, detail="Ошибка при добавлении пользователя в базу данных.")
return {"message": "Пользователь успешно зарегистрирован."}
class LoginRequest(BaseModel):
login: str
password: str
@router.post("/login")
def login_user(user_data: LoginRequest, db: Session = Depends(get_db), response: Response = None):
user = db.query(User).filter(User.login == user_data.login).first()
if not user:
raise HTTPException(status_code=404, detail="Пользователя с таким логином не существует.")
if not verify_password(user.password, user_data.password):
raise HTTPException(status_code=401, detail="Неверный пароль.")
JWT_token = gen_jwt(user.id, user.access_level)
try:
if response:
cookie_params = {
"httponly": False,
"samesite": "lax",
"max_age": 3600*24*7,
"path": "/",
}
response.set_cookie(key="id", value=str(user.id), **cookie_params)
response.set_cookie(key="login", value=quote(user.login), **cookie_params)
response.set_cookie(key="email", value=quote(user.email), **cookie_params)
full_name = f"{user.last_name} {user.first_name} {user.middle_name or ''}".strip()
response.set_cookie(key="full_name", value=quote(full_name), **cookie_params)
avatar_value = quote(user.avatar) if user.avatar else quote("/default-avatar.png")
response.set_cookie(key="avatar", value=avatar_value, **cookie_params)
response.set_cookie(key="JWT_token", value=JWT_token, **cookie_params)
print(f"JWT токен установлен для пользователя {user.login}: {JWT_token[:10]}...")
except Exception as e:
print(f"Ошибка при установке cookie: {str(e)}")
return {
"message": "Успешный вход.",
"token": JWT_token,
"user_id": user.id,
"login": user.login
}
@router.post("/logout")
def logout_user(response: Response):
response.delete_cookie(key="login")
response.delete_cookie(key="email")
response.delete_cookie(key="full_name")
response.delete_cookie(key="id")
response.delete_cookie(key="avatar")
response.delete_cookie(key="JWT_token")
return {"message": "Вы успешно вышли из системы."}
@router.get("/user/{user_id}")
def get_user_data(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Пользователь с таким ID не найден.")
return {
"login": user.login,
"avatar": user.avatar,
"first_name": user.first_name,
"last_name": user.last_name,
"middle_name": user.middle_name,
"email": user.email
}
class ChangePasswordRequest(BaseModel):
old_password: str
new_password: str
@router.post("/change_password")
def change_password(
password_data: ChangePasswordRequest,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
):
token = credentials.credentials
decoded_data = decode_jwt(token)
user_id = decoded_data.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Недействительный токен.")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Пользователь не найден.")
if not verify_password(user.password, password_data.old_password):
raise HTTPException(status_code=401, detail="Неверный старый пароль.")
user.password = hash_password(password_data.new_password)
try:
db.commit()
except:
db.rollback()
raise HTTPException(status_code=500, detail="Ошибка при обновлении пароля.")
return {"message": "Пароль успешно изменён."}
class ChangeUsernameRequest(BaseModel):
new_login: str
@router.post("/change_username")
def change_username(
login_data: ChangeUsernameRequest,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
response: Response = None
):
token = credentials.credentials
decoded_data = decode_jwt(token)
user_id = decoded_data.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Недействительный токен.")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Пользователь не найден.")
existing_user = db.query(User).filter(User.login == login_data.new_login).first()
if existing_user:
raise HTTPException(status_code=400, detail="Логин уже используется другим пользователем.")
try:
user.login = login_data.new_login
db.commit()
db.refresh(user)
# Безопасно устанавливаем cookie
try:
if response:
response.set_cookie(
key="login",
value=user.login,
httponly=False
)
except:
# Игнорируем ошибку установки cookie
pass
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail="Ошибка при обновлении логина.")
return {"message": "Логин успешно изменен."}
class ChangeFirstNameRequest(BaseModel):
new_first_name: str
@router.post("/change_name")
def change_first_name(
name_data: ChangeFirstNameRequest,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
response: Response = None
):
token = credentials.credentials
decoded_data = decode_jwt(token)
user_id = decoded_data.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Недействительный токен.")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Пользователь с таким ID не найден.")
try:
user.first_name = name_data.new_first_name
db.commit()
db.refresh(user)
# Безопасная установка cookie с проверкой response и URL-кодированием
try:
if response:
full_name = f"{user.last_name} {user.first_name} {user.middle_name or ''}".strip()
response.set_cookie(
key="full_name",
value=quote(full_name), # URL-кодирование для поддержки кириллицы
httponly=False,
samesite="lax",
max_age=3600*24*30, # 30 дней
path="/"
)
except Exception as e:
print(f"Ошибка при установке cookie: {str(e)}")
# Продолжаем выполнение даже при ошибке cookie
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Ошибка при обновлении имени: {str(e)}")
# Возвращаем обновленные данные для использования на клиенте
return {
"message": "Имя успешно изменено.",
"full_name": f"{user.last_name} {user.first_name} {user.middle_name or ''}".strip(),
"updated_at": str(datetime.now())
}
class ChangeLastNameRequest(BaseModel):
new_last_name: str
@router.post("/change_last_name")
def change_last_name(
last_name_data: ChangeLastNameRequest,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
response: Response = None
):
token = credentials.credentials
decoded_data = decode_jwt(token)
user_id = decoded_data.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Недействительный токен.")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Пользователь с таким ID не найден.")
try:
user.last_name = last_name_data.new_last_name
db.commit()
db.refresh(user)
# Безопасно устанавливаем cookie
try:
if response:
response.set_cookie(
key="full_name",
value=f"{user.last_name} {user.first_name} {user.middle_name or ''}",
httponly=False
)
except:
# Игнорируем ошибку установки cookie
pass
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail="Ошибка при обновлении фамилии.")
return {"message": "Фамилия успешно изменена."}
class ChangeMiddleNameRequest(BaseModel):
new_middle_name: str | None = None
@router.post("/change_middle_name")
def change_middle_name(
middle_name_data: ChangeMiddleNameRequest,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
response: Response = None
):
token = credentials.credentials
decoded_data = decode_jwt(token)
user_id = decoded_data.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Недействительный токен.")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Пользователь с таким ID не найден.")
try:
user.middle_name = middle_name_data.new_middle_name
db.commit()
db.refresh(user)
# Безопасно устанавливаем cookie
try:
if response:
response.set_cookie(
key="full_name",
value=f"{user.last_name} {user.first_name} {user.middle_name or ''}",
httponly=False
)
except:
# Игнорируем ошибку установки cookie
pass
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail="Ошибка при обновлении отчества.")
return {"message": "Отчество успешно изменено."}
class ChangeEmailRequest(BaseModel):
new_email: EmailStr
@router.post("/change_email")
def change_email(
email_data: ChangeEmailRequest,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
response: Response = None
):
token = credentials.credentials
decoded_data = decode_jwt(token)
user_id = decoded_data.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Недействительный токен.")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Пользователь с таким ID не найден.")
existing_user = db.query(User).filter(User.email == email_data.new_email).first()
if existing_user:
raise HTTPException(status_code=400, detail="Почта уже используется другим пользователем.")
try:
user.email = email_data.new_email
db.commit()
db.refresh(user)
# Безопасно устанавливаем cookie
try:
if response:
response.set_cookie(
key="email",
value=user.email,
httponly=False
)
except:
# Игнорируем ошибку установки cookie
pass
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail="Ошибка при обновлении почты.")
return {"message": "Почта успешно изменена."}
@router.post("/upload_avatar")
def upload_avatar(
file: UploadFile = File(...),
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
request: Request = None,
response: Response = None
):
token = credentials.credentials
decoded_data = decode_jwt(token)
user_id = decoded_data.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Недействительный токен.")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Пользователь с таким ID не найден.")
if not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="Файл должен быть изображением.")
try:
upload_dir = get_upload_dir(user_id)
file_path = save_file(file, upload_dir)
file_url = generate_file_url(request, file_path)
user.avatar = str(file_url)
db.commit()
db.refresh(user)
# Безопасно устанавливаем cookie
try:
if response:
response.set_cookie(
key="avatar",
value=file_url,
httponly=False
)
except:
# Игнорируем ошибку установки cookie
pass
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Ошибка при загрузке файла: {str(e)}")
return {"message": "Аватар успешно загружен.", "file_url": file_url}