131 lines
5.3 KiB
Python
131 lines
5.3 KiB
Python
from __future__ import annotations
|
||
from typing import Optional
|
||
|
||
from flask_sqlalchemy import SQLAlchemy
|
||
from flask_login import UserMixin
|
||
from sqlalchemy import String, Boolean
|
||
from sqlalchemy.orm import Mapped, mapped_column
|
||
from werkzeug.security import check_password_hash as wz_check_password_hash
|
||
import logging
|
||
|
||
# passlib: Argon2id 기본, scrypt/pbkdf2는 검증만 (점진 마이그레이션)
|
||
from passlib.context import CryptContext
|
||
|
||
db = SQLAlchemy()
|
||
|
||
# Argon2id를 기본으로 사용하고, scrypt/pbkdf2_sha256은 검증만 허용(=deprecated)
|
||
# 로그인에 성공하면 자동으로 Argon2id로 재해시 저장합니다.
|
||
pwd_ctx = CryptContext(
|
||
schemes=["argon2", "scrypt", "pbkdf2_sha256"],
|
||
default="argon2",
|
||
deprecated=["scrypt", "pbkdf2_sha256"],
|
||
# Argon2id 파라미터 (기본도 충분하지만 서비스 보안수준에 맞춰 조정 가능)
|
||
# time_cost: 2~4, memory_cost: 64~256 MiB 권장 범위
|
||
argon2__type="ID",
|
||
argon2__time_cost=3,
|
||
argon2__memory_cost=102400, # 100 MiB
|
||
argon2__parallelism=8,
|
||
# PBKDF2 라운드 상향 (레거시 검증용)
|
||
pbkdf2_sha256__rounds=300_000,
|
||
)
|
||
|
||
class User(db.Model, UserMixin):
|
||
__tablename__ = "user"
|
||
|
||
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
|
||
username: Mapped[str] = mapped_column(String(80), unique=True, nullable=False)
|
||
email: Mapped[str] = mapped_column(String(120), unique=True, nullable=False, index=True)
|
||
password: Mapped[str] = mapped_column(String(255), nullable=False)
|
||
is_admin: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
|
||
is_active: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
|
||
|
||
# 가입 승인 관련 필드
|
||
is_approved: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
|
||
approval_token: Mapped[Optional[str]] = mapped_column(String(100), unique=True, nullable=True)
|
||
|
||
# ── 유틸 메서드
|
||
def __repr__(self) -> str: # pragma: no cover
|
||
return f"<User id={self.id} email={self.email} active={self.is_active}>"
|
||
|
||
# 신규 저장/변경 시 항상 Argon2id로 해시
|
||
def set_password(self, password: str) -> None:
|
||
self.password = pwd_ctx.hash(password)
|
||
|
||
# 혼합 검증: scrypt/pbkdf2/argon2 모두 검증 가능
|
||
# 검증 성공 + 레거시 스킴이면 Argon2id로 즉시 재해시 & 커밋
|
||
def check_password(self, password: str) -> bool:
|
||
"""
|
||
- 우선 저장된 해시의 '형식'을 보고 검증기를 선택한다.
|
||
* 'scrypt:' 또는 'pbkdf2:'로 시작하면 → Werkzeug 검증
|
||
* 그 외 → passlib(CryptContext) 검증
|
||
- 검증에 성공했고 현재 해시가 Argon2가 아니거나(passlib가 needs_update True),
|
||
Werkzeug 형식(scrypt/pbkdf2)이라면 즉시 Argon2id로 재해시 저장한다.
|
||
"""
|
||
raw = self.password or ""
|
||
|
||
ok = False
|
||
used_werkzeug = False
|
||
|
||
try:
|
||
# 1) Werkzeug 포맷 감지 (예: 'scrypt:32768:8:1$...', 'pbkdf2:sha256:260000$...')
|
||
if raw.startswith("scrypt:") or raw.startswith("pbkdf2:"):
|
||
used_werkzeug = True
|
||
ok = wz_check_password_hash(raw, password) # hashlib 기반 → 형식 그대로 검증
|
||
else:
|
||
# 2) passlib 포맷(argon2/$scrypt$/pbkdf2_sha256) 시도
|
||
ok = pwd_ctx.verify(password, raw)
|
||
except Exception as e:
|
||
logging.warning("password verify failed: %s", e)
|
||
ok = False
|
||
|
||
if not ok:
|
||
return False
|
||
|
||
# ── 여기까지 왔으면 검증 성공. 필요 시 Argon2id로 즉시 업그레이드.
|
||
try:
|
||
need_upgrade = False
|
||
|
||
if used_werkzeug:
|
||
# Werkzeug 형식(scrypt/pbkdf2)은 우리 정책상 모두 Argon2로 마이그레이션
|
||
need_upgrade = True
|
||
else:
|
||
# passlib가 판단하는 업그레이드 필요 여부(파라미터/알고리즘 기준)
|
||
need_upgrade = pwd_ctx.needs_update(raw)
|
||
|
||
if need_upgrade:
|
||
self.password = pwd_ctx.hash(password) # Argon2id 기본
|
||
db.session.add(self)
|
||
db.session.commit()
|
||
except Exception as e:
|
||
# 업그레이드 실패해도 로그인 자체는 성공시킨다(다음 로그인 때 재시도)
|
||
logging.warning("password rehash (argon2) failed: %s", e)
|
||
db.session.rollback()
|
||
|
||
return True
|
||
|
||
# Flask-Login 호환 (UserMixin 기본 get_id 사용 가능하지만 명시)
|
||
def get_id(self) -> str: # pragma: no cover
|
||
return str(self.id)
|
||
|
||
# ── 조회 헬퍼
|
||
@staticmethod
|
||
def find_by_email(email: Optional[str]) -> Optional["User"]:
|
||
q = (email or "").strip().lower()
|
||
if not q:
|
||
return None
|
||
return User.query.filter_by(email=q).first()
|
||
|
||
@staticmethod
|
||
def find_by_username(username: Optional[str]) -> Optional["User"]:
|
||
q = (username or "").strip()
|
||
if not q:
|
||
return None
|
||
return User.query.filter_by(username=q).first()
|
||
|
||
|
||
# Flask-Login user_loader (SQLAlchemy 2.0 방식)
|
||
def load_user(user_id: str) -> Optional[User]: # pragma: no cover
|
||
try:
|
||
return db.session.get(User, int(user_id))
|
||
except Exception:
|
||
return None |