Initial commit
This commit is contained in:
BIN
backend/models/__pycache__/user.cpython-313.pyc
Normal file
BIN
backend/models/__pycache__/user.cpython-313.pyc
Normal file
Binary file not shown.
127
backend/models/user.py
Normal file
127
backend/models/user.py
Normal file
@@ -0,0 +1,127 @@
|
||||
from __future__ import annotations
|
||||
from typing import Optional
|
||||
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from flask_login import UserMixin
|
||||
from sqlalchemy import String, Boolean
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
from werkzeug.security import check_password_hash as wz_check_password_hash
|
||||
import logging
|
||||
|
||||
# passlib: Argon2id 기본, scrypt/pbkdf2는 검증만 (점진 마이그레이션)
|
||||
from passlib.context import CryptContext
|
||||
|
||||
db = SQLAlchemy()
|
||||
|
||||
# Argon2id를 기본으로 사용하고, scrypt/pbkdf2_sha256은 검증만 허용(=deprecated)
|
||||
# 로그인에 성공하면 자동으로 Argon2id로 재해시 저장합니다.
|
||||
pwd_ctx = CryptContext(
|
||||
schemes=["argon2", "scrypt", "pbkdf2_sha256"],
|
||||
default="argon2",
|
||||
deprecated=["scrypt", "pbkdf2_sha256"],
|
||||
# Argon2id 파라미터 (기본도 충분하지만 서비스 보안수준에 맞춰 조정 가능)
|
||||
# time_cost: 2~4, memory_cost: 64~256 MiB 권장 범위
|
||||
argon2__type="ID",
|
||||
argon2__time_cost=3,
|
||||
argon2__memory_cost=102400, # 100 MiB
|
||||
argon2__parallelism=8,
|
||||
# PBKDF2 라운드 상향 (레거시 검증용)
|
||||
pbkdf2_sha256__rounds=300_000,
|
||||
)
|
||||
|
||||
class User(db.Model, UserMixin):
|
||||
__tablename__ = "user"
|
||||
|
||||
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
|
||||
username: Mapped[str] = mapped_column(String(80), unique=True, nullable=False)
|
||||
email: Mapped[str] = mapped_column(String(120), unique=True, nullable=False, index=True)
|
||||
password: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
is_admin: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
|
||||
is_active: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
|
||||
|
||||
# ── 유틸 메서드
|
||||
def __repr__(self) -> str: # pragma: no cover
|
||||
return f"<User id={self.id} email={self.email} active={self.is_active}>"
|
||||
|
||||
# 신규 저장/변경 시 항상 Argon2id로 해시
|
||||
def set_password(self, password: str) -> None:
|
||||
self.password = pwd_ctx.hash(password)
|
||||
|
||||
# 혼합 검증: scrypt/pbkdf2/argon2 모두 검증 가능
|
||||
# 검증 성공 + 레거시 스킴이면 Argon2id로 즉시 재해시 & 커밋
|
||||
def check_password(self, password: str) -> bool:
|
||||
"""
|
||||
- 우선 저장된 해시의 '형식'을 보고 검증기를 선택한다.
|
||||
* 'scrypt:' 또는 'pbkdf2:'로 시작하면 → Werkzeug 검증
|
||||
* 그 외 → passlib(CryptContext) 검증
|
||||
- 검증에 성공했고 현재 해시가 Argon2가 아니거나(passlib가 needs_update True),
|
||||
Werkzeug 형식(scrypt/pbkdf2)이라면 즉시 Argon2id로 재해시 저장한다.
|
||||
"""
|
||||
raw = self.password or ""
|
||||
|
||||
ok = False
|
||||
used_werkzeug = False
|
||||
|
||||
try:
|
||||
# 1) Werkzeug 포맷 감지 (예: 'scrypt:32768:8:1$...', 'pbkdf2:sha256:260000$...')
|
||||
if raw.startswith("scrypt:") or raw.startswith("pbkdf2:"):
|
||||
used_werkzeug = True
|
||||
ok = wz_check_password_hash(raw, password) # hashlib 기반 → 형식 그대로 검증
|
||||
else:
|
||||
# 2) passlib 포맷(argon2/$scrypt$/pbkdf2_sha256) 시도
|
||||
ok = pwd_ctx.verify(password, raw)
|
||||
except Exception as e:
|
||||
logging.warning("password verify failed: %s", e)
|
||||
ok = False
|
||||
|
||||
if not ok:
|
||||
return False
|
||||
|
||||
# ── 여기까지 왔으면 검증 성공. 필요 시 Argon2id로 즉시 업그레이드.
|
||||
try:
|
||||
need_upgrade = False
|
||||
|
||||
if used_werkzeug:
|
||||
# Werkzeug 형식(scrypt/pbkdf2)은 우리 정책상 모두 Argon2로 마이그레이션
|
||||
need_upgrade = True
|
||||
else:
|
||||
# passlib가 판단하는 업그레이드 필요 여부(파라미터/알고리즘 기준)
|
||||
need_upgrade = pwd_ctx.needs_update(raw)
|
||||
|
||||
if need_upgrade:
|
||||
self.password = pwd_ctx.hash(password) # Argon2id 기본
|
||||
db.session.add(self)
|
||||
db.session.commit()
|
||||
except Exception as e:
|
||||
# 업그레이드 실패해도 로그인 자체는 성공시킨다(다음 로그인 때 재시도)
|
||||
logging.warning("password rehash (argon2) failed: %s", e)
|
||||
db.session.rollback()
|
||||
|
||||
return True
|
||||
|
||||
# Flask-Login 호환 (UserMixin 기본 get_id 사용 가능하지만 명시)
|
||||
def get_id(self) -> str: # pragma: no cover
|
||||
return str(self.id)
|
||||
|
||||
# ── 조회 헬퍼
|
||||
@staticmethod
|
||||
def find_by_email(email: Optional[str]) -> Optional["User"]:
|
||||
q = (email or "").strip().lower()
|
||||
if not q:
|
||||
return None
|
||||
return User.query.filter_by(email=q).first()
|
||||
|
||||
@staticmethod
|
||||
def find_by_username(username: Optional[str]) -> Optional["User"]:
|
||||
q = (username or "").strip()
|
||||
if not q:
|
||||
return None
|
||||
return User.query.filter_by(username=q).first()
|
||||
|
||||
|
||||
# Flask-Login user_loader (SQLAlchemy 2.0 방식)
|
||||
def load_user(user_id: str) -> Optional[User]: # pragma: no cover
|
||||
try:
|
||||
return db.session.get(User, int(user_id))
|
||||
except Exception:
|
||||
return None
|
||||
Reference in New Issue
Block a user