diff --git a/app/api/ssh_credentials.py b/app/api/ssh_credentials.py index 89ba80a..472657c 100644 --- a/app/api/ssh_credentials.py +++ b/app/api/ssh_credentials.py @@ -4,6 +4,7 @@ from datetime import datetime, timedelta from app.api.auth import get_current_user from app.schemas.auth import CurrentUser from app.services.temp_ssh_password_service import temp_ssh_password_manager +from app.config import settings import os router = APIRouter() @@ -21,15 +22,38 @@ async def get_ssh_credentials(current_user: CurrentUser = Depends(get_current_us Returns: SSH 연결 정보 및 임시 비밀번호 """ + # 1. 정적 자격증명 확인 (개발 환경 또는 정적 비밀번호 사용 시) + if settings.SSH_PASSWORD: + ssh_host = settings.SSH_HOST or "api.mouse84.com" + ssh_port = settings.SSH_PORT + ssh_username = settings.SSH_USERNAME or current_user.username + + # 만료 시간 (24시간) + expires_at = datetime.utcnow() + timedelta(hours=24) + + return { + "ssh_host": ssh_host, + "ssh_port": ssh_port, + "ssh_username": ssh_username, + "ssh_password": settings.SSH_PASSWORD, + "expires_at": expires_at.isoformat(), + "expires_in_seconds": 86400 + } + + # 2. 임시 비밀번호 생성 (기본 동작) + + # .env 설정을 우선 사용 (username이 지정된 경우 해당 계정으로 임시 비밀번호 생성) + target_username = settings.SSH_USERNAME or current_user.username + # 임시 비밀번호 생성 (1시간 유효) temp_password = temp_ssh_password_manager.generate_password( - username=current_user.username, + username=target_username, validity_hours=1 ) - # SSH 서버 정보 (외부 접속용) - ssh_host = os.getenv("SSH_HOST", "api.mouse84.com") # 외부 DDNS - ssh_port = int(os.getenv("SSH_PORT", "54054")) # 외부 포트 (내부 22로 포워딩) + # SSH 서버 정보 (설정값 우선) + ssh_host = settings.SSH_HOST or "api.mouse84.com" + ssh_port = settings.SSH_PORT or 54054 # 만료 시간 계산 expires_at = datetime.utcnow() + timedelta(hours=1) @@ -37,7 +61,7 @@ async def get_ssh_credentials(current_user: CurrentUser = Depends(get_current_us return { "ssh_host": ssh_host, "ssh_port": ssh_port, - "ssh_username": current_user.username, + "ssh_username": target_username, "ssh_password": temp_password, "expires_at": expires_at.isoformat(), "expires_in_seconds": 3600 diff --git a/app/services/temp_ssh_password_service.py b/app/services/temp_ssh_password_service.py index 489701b..b156b37 100644 --- a/app/services/temp_ssh_password_service.py +++ b/app/services/temp_ssh_password_service.py @@ -1,14 +1,14 @@ import secrets import hashlib -from typing import Dict, Optional +from typing import Dict, List, Optional from datetime import datetime, timedelta class TempSshPasswordManager: """임시 SSH 비밀번호 관리""" def __init__(self): - # 메모리 기반 저장소 (프로덕션에서는 Redis 사용 권장) - self._passwords: Dict[str, dict] = {} + # 메모리 기반 저장소 (username -> list of {hash, expires_at}) + self._passwords: Dict[str, List[dict]] = {} def generate_password(self, username: str, validity_hours: int = 1) -> str: """ @@ -21,6 +21,9 @@ class TempSshPasswordManager: Returns: 임시 비밀번호 """ + # 메모리 정리 (생성 시마다 만료된 것 정리) + self.cleanup_expired() + # 안전한 랜덤 비밀번호 생성 (32자) temp_password = secrets.token_urlsafe(32) @@ -30,49 +33,70 @@ class TempSshPasswordManager: # 만료 시간 계산 expires_at = datetime.utcnow() + timedelta(hours=validity_hours) - # 저장 - self._passwords[username] = { + # 새 토큰 정보 + token_data = { "password_hash": password_hash, "expires_at": expires_at, "created_at": datetime.utcnow() } + + # 해당 사용자에 토큰 추가 (리스트 초기화) + if username not in self._passwords: + self._passwords[username] = [] + + self._passwords[username].append(token_data) return temp_password def verify_password(self, username: str, password: str) -> bool: """ - 비밀번호 검증 - - Args: - username: 사용자명 - password: 검증할 비밀번호 - - Returns: - 유효 여부 + 비밀번호 검증 (다중 토큰 지원) """ if username not in self._passwords: return False - stored = self._passwords[username] + input_hash = hashlib.sha256(password.encode()).hexdigest() + now = datetime.utcnow() - # 만료 확인 - if datetime.utcnow() > stored["expires_at"]: - del self._passwords[username] - return False + # 유효한 토큰 중 하나라도 일치하면 성공 + # (리스트 복사본으로 순회하지 않고, 인덱스로 접근하거나 필터링) + valid_tokens = [] + is_valid = False - # 비밀번호 확인 - password_hash = hashlib.sha256(password.encode()).hexdigest() - return password_hash == stored["password_hash"] + for token in self._passwords[username]: + # 만료된 토큰은 제외 (Clean up on read) + if now > token["expires_at"]: + continue + + valid_tokens.append(token) + + if token["password_hash"] == input_hash: + is_valid = True + + # 리스트 업데이트 (만료된 것 제거됨) + self._passwords[username] = valid_tokens + + return is_valid def cleanup_expired(self): """만료된 비밀번호 정리""" now = datetime.utcnow() - expired = [ - username for username, data in self._passwords.items() - if now > data["expires_at"] - ] - for username in expired: - del self._passwords[username] + users_to_check = list(self._passwords.keys()) + + for username in users_to_check: + # 유효한 토큰만 필터링 + self._passwords[username] = [ + token for token in self._passwords[username] + if now <= token["expires_at"] + ] + + # 토큰이 하나도 없으면 사용자 키 삭제 + if not self._passwords[username]: + del self._passwords[username] + + def get_active_count(self) -> int: + """현재 활성화된 토큰 수 (디버깅용)""" + return sum(len(tokens) for tokens in self._passwords.values()) # 싱글톤 인스턴스 temp_ssh_password_manager = TempSshPasswordManager()