103 lines
3.4 KiB
Python
103 lines
3.4 KiB
Python
import secrets
|
|
import hashlib
|
|
from typing import Dict, List, Optional
|
|
from datetime import datetime, timedelta
|
|
|
|
class TempSshPasswordManager:
|
|
"""임시 SSH 비밀번호 관리"""
|
|
|
|
def __init__(self):
|
|
# 메모리 기반 저장소 (username -> list of {hash, expires_at})
|
|
self._passwords: Dict[str, List[dict]] = {}
|
|
|
|
def generate_password(self, username: str, validity_hours: int = 1) -> str:
|
|
"""
|
|
임시 SSH 비밀번호 생성
|
|
|
|
Args:
|
|
username: 사용자명
|
|
validity_hours: 유효 시간 (기본 1시간)
|
|
|
|
Returns:
|
|
임시 비밀번호
|
|
"""
|
|
# 메모리 정리 (생성 시마다 만료된 것 정리)
|
|
self.cleanup_expired()
|
|
|
|
# 안전한 랜덤 비밀번호 생성 (32자)
|
|
temp_password = secrets.token_urlsafe(32)
|
|
|
|
# 해시 저장 (실제 비밀번호는 저장하지 않음)
|
|
password_hash = hashlib.sha256(temp_password.encode()).hexdigest()
|
|
|
|
# 만료 시간 계산
|
|
expires_at = datetime.utcnow() + timedelta(hours=validity_hours)
|
|
|
|
# 새 토큰 정보
|
|
token_data = {
|
|
"password_hash": password_hash,
|
|
"expires_at": expires_at,
|
|
"created_at": datetime.utcnow()
|
|
}
|
|
|
|
# 해당 사용자에 토큰 추가 (리스트 초기화)
|
|
if username not in self._passwords:
|
|
self._passwords[username] = []
|
|
|
|
self._passwords[username].append(token_data)
|
|
|
|
return temp_password
|
|
|
|
def verify_password(self, username: str, password: str) -> bool:
|
|
"""
|
|
비밀번호 검증 (다중 토큰 지원)
|
|
"""
|
|
if username not in self._passwords:
|
|
return False
|
|
|
|
input_hash = hashlib.sha256(password.encode()).hexdigest()
|
|
now = datetime.utcnow()
|
|
|
|
# 유효한 토큰 중 하나라도 일치하면 성공
|
|
# (리스트 복사본으로 순회하지 않고, 인덱스로 접근하거나 필터링)
|
|
valid_tokens = []
|
|
is_valid = False
|
|
|
|
for token in self._passwords[username]:
|
|
# 만료된 토큰은 제외 (Clean up on read)
|
|
if now > token["expires_at"]:
|
|
continue
|
|
|
|
valid_tokens.append(token)
|
|
|
|
if token["password_hash"] == input_hash:
|
|
is_valid = True
|
|
|
|
# 리스트 업데이트 (만료된 것 제거됨)
|
|
self._passwords[username] = valid_tokens
|
|
|
|
return is_valid
|
|
|
|
def cleanup_expired(self):
|
|
"""만료된 비밀번호 정리"""
|
|
now = datetime.utcnow()
|
|
users_to_check = list(self._passwords.keys())
|
|
|
|
for username in users_to_check:
|
|
# 유효한 토큰만 필터링
|
|
self._passwords[username] = [
|
|
token for token in self._passwords[username]
|
|
if now <= token["expires_at"]
|
|
]
|
|
|
|
# 토큰이 하나도 없으면 사용자 키 삭제
|
|
if not self._passwords[username]:
|
|
del self._passwords[username]
|
|
|
|
def get_active_count(self) -> int:
|
|
"""현재 활성화된 토큰 수 (디버깅용)"""
|
|
return sum(len(tokens) for tokens in self._passwords.values())
|
|
|
|
# 싱글톤 인스턴스
|
|
temp_ssh_password_manager = TempSshPasswordManager()
|