import asyncio import asyncssh import secrets from typing import Dict, Optional from datetime import datetime from app.config import settings from app.utils.exceptions import InternalServerError class SSHTunnelManager: """SSH 터널 관리""" def __init__(self): self.active_tunnels: Dict[str, 'TunnelSession'] = {} self.used_ports = set() # --------------------------------------------------------- # 포트 관리자 # --------------------------------------------------------- def _get_available_port(self) -> int: """사용 가능한 포트 찾기""" for port in range(settings.TUNNEL_PORT_MIN, settings.TUNNEL_PORT_MAX): if port not in self.used_ports: self.used_ports.add(port) return port raise InternalServerError("사용 가능한 포트가 없습니다") def _release_port(self, port: int): """포트 해제""" self.used_ports.discard(port) # --------------------------------------------------------- # 터널 생성 # --------------------------------------------------------- async def create_tunnel( self, user_id: int, vm_id: int, remote_host: str, remote_port: int = 3389, vm_name: str = "", rdp_username: str = "" ) -> Dict: """ SSH 터널 생성 """ session_id = secrets.token_urlsafe(32) local_port = self._get_available_port() try: ssh_config = { "host": settings.SSH_HOST, "port": settings.SSH_PORT, "username": settings.SSH_USERNAME, "known_hosts": None } if settings.SSH_KEY_PATH: ssh_config["client_keys"] = [settings.SSH_KEY_PATH] elif settings.SSH_PASSWORD: ssh_config["password"] = settings.SSH_PASSWORD else: raise InternalServerError("SSH 인증 정보가 설정되지 않았습니다") # SSH 연결 생성 conn = await asyncssh.connect(**ssh_config) # 로컬 포트 포워딩 (외부 접속 허용) listener = await conn.forward_local_port( '0.0.0.0', # 모든 인터페이스에서 리스닝 (외부 접속 허용) local_port, remote_host, remote_port ) # 세션 저장 tunnel_session = TunnelSession( session_id=session_id, user_id=user_id, vm_id=vm_id, local_port=local_port, remote_host=remote_host, remote_port=remote_port, connection=conn, listener=listener, created_at=datetime.utcnow(), vm_name=vm_name or "", rdp_username=rdp_username or "" ) self.active_tunnels[session_id] = tunnel_session return { "session_id": session_id, "local_port": local_port, "remote_host": remote_host, "remote_port": remote_port, "ssh_host": settings.SSH_HOST, "vm_name": vm_name or "", "rdp_username": rdp_username or "", "created_at": tunnel_session.created_at.isoformat() } except Exception as e: self._release_port(local_port) raise InternalServerError(f"SSH 터널 생성 실패: {str(e)}") # --------------------------------------------------------- # 터널 종료 # --------------------------------------------------------- async def close_tunnel(self, session_id: str) -> bool: tunnel = self.active_tunnels.get(session_id) if not tunnel: return False try: if tunnel.listener: tunnel.listener.close() await tunnel.listener.wait_closed() if tunnel.connection: tunnel.connection.close() await tunnel.connection.wait_closed() self._release_port(tunnel.local_port) del self.active_tunnels[session_id] return True except Exception as e: print(f"터널 종료 오류: {e}") return False # --------------------------------------------------------- # 터널 상태 조회 # --------------------------------------------------------- async def get_tunnel_status(self, session_id: str) -> Optional[Dict]: tunnel = self.active_tunnels.get(session_id) if not tunnel: return None is_active = tunnel.connection is not None and tunnel.listener is not None uptime = (datetime.utcnow() - tunnel.created_at).total_seconds() # 항상 null 반환하지 않도록 빈 문자열 처리 vm_name = tunnel.vm_name or "" rdp_username = tunnel.rdp_username or "" return { "session_id": session_id, "is_active": is_active, "uptime_seconds": int(uptime), "created_at": tunnel.created_at.isoformat(), "vm_id": tunnel.vm_id, "vm_name": vm_name, "rdp_username": rdp_username, "tunnel_info": { "session_id": session_id, "local_port": tunnel.local_port, "remote_host": tunnel.remote_host, "remote_port": tunnel.remote_port, "vm_id": tunnel.vm_id, "vm_name": vm_name, "rdp_username": rdp_username, "is_active": is_active } } # --------------------------------------------------------- # 비활성 터널 정리 # --------------------------------------------------------- async def cleanup_inactive_tunnels(self): to_remove = [] for sid, tunnel in self.active_tunnels.items(): if tunnel.connection is None or tunnel.listener is None: to_remove.append(sid) for sid in to_remove: await self.close_tunnel(sid) # --------------------------------------------------------- # 터널 세션 클래스 # --------------------------------------------------------- class TunnelSession: """터널 세션 정보""" def __init__( self, session_id: str, user_id: int, vm_id: int, local_port: int, remote_host: str, remote_port: int, connection, listener, created_at: datetime, vm_name: str = "", rdp_username: str = "" ): self.session_id = session_id self.user_id = user_id self.vm_id = vm_id self.local_port = local_port self.remote_host = remote_host self.remote_port = remote_port self.connection = connection self.listener = listener self.created_at = created_at # 절대 null 반환 방지 self.vm_name = vm_name or "" self.rdp_username = rdp_username or "" # 싱글톤 인스턴스 ssh_tunnel_manager = SSHTunnelManager()