diff --git a/app/api/auth.py b/app/api/auth.py index 8abdfc5..9360552 100644 --- a/app/api/auth.py +++ b/app/api/auth.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, Depends, HTTPException, status +from fastapi import APIRouter, Depends, HTTPException, status, Query from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from app.database import get_db @@ -41,6 +41,28 @@ async def get_current_user( is_active=user.is_active ) +async def get_current_user_ws(token: str = Query(...), db: Session = Depends(get_db)) -> CurrentUser: + """The WebSocket authentication helper""" + payload = decode_token(token) + if not payload: + raise AuthenticationError("Invalid token") + + user_id = payload.get("sub") + if not user_id: + raise AuthenticationError("Invalid token payload") + + user = auth_service.get_user_by_id(db, int(user_id)) + if not user or not user.is_active: + raise AuthenticationError("User not found or inactive") + + return CurrentUser( + id=user.id, + username=user.username, + email=user.email, + role=user.role, + is_active=user.is_active + ) + @router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED) async def register(user_data: UserRegister, db: Session = Depends(get_db)): """ diff --git a/app/api/vms.py b/app/api/vms.py index 50f75b7..dc16ee4 100644 --- a/app/api/vms.py +++ b/app/api/vms.py @@ -38,24 +38,29 @@ async def get_my_vms( vm_id = vm["vmid"] access = access_map.get(vm_id) - # VMAccess가 있으면 해당 정보 사용, 없으면 기본값 + # 권한이 없는 VM은 목록에서 제외 + if not access: + continue + + # VMAccess가 있으면 해당 정보 사용 vm_info = VMInfo( vm_id=vm_id, - node=vm["node"], + node=vm.get("node"), # cluster/resources returns 'node' + type=vm.get("type", "qemu"), name=vm.get("name", "Unknown"), status=vm.get("status", "unknown"), - ip_address=access.static_ip if access else None, # Static IP 자동 설정 - cpus=vm.get("cpus", 0), + ip_address=access.static_ip, # Static IP + cpus=vm.get("maxcpu", 0), # cluster/resources uses maxcpu memory=vm.get("maxmem", 0) // (1024 * 1024), # bytes to MB memory_usage=vm.get("mem", 0) // (1024 * 1024) if vm.get("mem") else None, cpu_usage=vm.get("cpu", 0), - can_start=True, - can_stop=True, - can_reboot=True, - can_connect=True, - rdp_username=access.rdp_username if access else None, # RDP 사용자명 - rdp_password=access.rdp_password if access else None, # RDP 비밀번호 - rdp_port=access.rdp_port if access else 3389 # RDP 포트 + can_start=access.can_start, + can_stop=access.can_stop, + can_reboot=access.can_reboot, + can_connect=access.can_connect, + rdp_username=access.rdp_username, + rdp_password=access.rdp_password, + rdp_port=access.rdp_port or 3389 ) vm_list.append(vm_info) @@ -65,21 +70,22 @@ async def get_my_vms( async def get_vm_detail( vm_id: int, node: str, + type: str = "qemu", current_user: CurrentUser = Depends(get_current_user) ): """ VM 상세 정보 조회 """ # VM 상태 조회 - status = await proxmox_service.get_vm_status(node, vm_id) + status = await proxmox_service.get_vm_status(node, vm_id, type) if not status: raise NotFoundError(f"VM {vm_id}를 찾을 수 없습니다") - # IP 조회 제거 - 연결에 필요하지 않음 return VMDetail( vm_id=vm_id, node=node, + type=type, name=status.get("name", "Unknown"), status=status.get("status", "unknown"), ip_address=None, # IP 조회 안 함 @@ -96,10 +102,11 @@ async def get_vm_detail( async def start_vm( vm_id: int, node: str, + type: str = "qemu", current_user: CurrentUser = Depends(get_current_user) ): """VM 시작""" - success = await proxmox_service.start_vm(node, vm_id) + success = await proxmox_service.start_vm(node, vm_id, type) return VMControlResponse( success=success, @@ -112,10 +119,11 @@ async def start_vm( async def stop_vm( vm_id: int, node: str, + type: str = "qemu", current_user: CurrentUser = Depends(get_current_user) ): """VM 종료""" - success = await proxmox_service.stop_vm(node, vm_id) + success = await proxmox_service.stop_vm(node, vm_id, type) return VMControlResponse( success=success, @@ -128,10 +136,11 @@ async def stop_vm( async def reboot_vm( vm_id: int, node: str, + type: str = "qemu", current_user: CurrentUser = Depends(get_current_user) ): """VM 재시작""" - success = await proxmox_service.reboot_vm(node, vm_id) + success = await proxmox_service.reboot_vm(node, vm_id, type) return VMControlResponse( success=success, diff --git a/app/api/ws.py b/app/api/ws.py new file mode 100644 index 0000000..aa14f13 --- /dev/null +++ b/app/api/ws.py @@ -0,0 +1,136 @@ +from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, Query +from typing import List, Dict +import asyncio +import logging +import json +from sqlalchemy.orm import Session +from app.database import SessionLocal +from app.api.auth import get_current_user_ws +from app.schemas.auth import CurrentUser +from app.services.proxmox_service import proxmox_service +from app.models.vm import VMAccess +from app.schemas.vm import VMInfo + +router = APIRouter() +logger = logging.getLogger(__name__) + +class ConnectionManager: + def __init__(self): + # Active connections: List of (WebSocket, User) tuples + self.active_connections: List[tuple[WebSocket, CurrentUser]] = [] + + async def connect(self, websocket: WebSocket, user: CurrentUser): + await websocket.accept() + self.active_connections.append((websocket, user)) + logger.info(f"WebSocket connected: {user.username}") + + def disconnect(self, websocket: WebSocket, user: CurrentUser): + if (websocket, user) in self.active_connections: + self.active_connections.remove((websocket, user)) + logger.info(f"WebSocket disconnected: {user.username}") + + async def broadcast(self, all_resources: List[Dict]): + """ + 모든 연결된 클라이언트에게 권한에 맞는 VM 상태를 전송 + """ + # DB 세션을 매번 새로 생성하는 것은 비효율적일 수 있으나, + # Background Task에서 실행되므로 안전하게 처리 + try: + db = SessionLocal() + + # 모든 활성 VM Access 정보 미리 로딩 + all_accesses = db.query(VMAccess).filter(VMAccess.is_active == True).all() + + # User ID별 Access Map 생성 + user_access_map = {} + for access in all_accesses: + if access.user_id not in user_access_map: + user_access_map[access.user_id] = {} + user_access_map[access.user_id][access.vm_id] = access + + for connection, user in self.active_connections: + try: + # 해당 유저의 권한 맵 가져오기 + access_map = user_access_map.get(user.id, {}) + + user_vm_list = [] + for res in all_resources: + vm_id = res.get("vmid") + # VMID가 없거나 권한 정보가 없으면 패스 + if not vm_id or vm_id not in access_map: + continue + + access = access_map[vm_id] + + # VMInfo 스키마에 맞춰 데이터 구성 + vm_info = { + "vm_id": vm_id, + "node": res.get("node"), + "type": res.get("type", "qemu"), + "name": res.get("name", "Unknown"), + "status": res.get("status", "unknown"), + "ip_address": access.static_ip, + "cpus": res.get("maxcpu", 0), + "memory": res.get("maxmem", 0) // (1024 * 1024), + "memory_usage": res.get("mem", 0) // (1024 * 1024) if res.get("mem") else 0, + "cpu_usage": res.get("cpu", 0), + # 권한 정보 + "can_start": access.can_start, + "can_stop": access.can_stop, + "can_reboot": access.can_reboot, + "can_connect": access.can_connect, + # RDP 정보는 보안상 웹소켓 브로드캐스트에서는 제외하거나 필요시 포함 + # (여기서는 제외하고 REST API 상세조회 사용 권장하지만, 목록 뷰를 위해 포함) + "rdp_username": access.rdp_username, + "rdp_port": access.rdp_port or 3389 + } + user_vm_list.append(vm_info) + + # 전송 + await connection.send_json({ + "type": "update", + "data": user_vm_list + }) + + except Exception as e: + logger.error(f"Error sending update to {user.username}: {e}") + # 에러 발생 시 연결 끊기 고려? + + db.close() + + except Exception as e: + logger.error(f"Broadcast error: {e}") + +manager = ConnectionManager() + +@router.websocket("/status") +async def websocket_endpoint( + websocket: WebSocket, + user: CurrentUser = Depends(get_current_user_ws) +): + try: + await manager.connect(websocket, user) + + try: + while True: + # 클라이언트로부터 메시지를 받을 일이 딱히 없지만 연결 유지를 위해 대기 + await websocket.receive_text() + except WebSocketDisconnect: + manager.disconnect(websocket, user) + + except Exception: + await websocket.close(code=4001) + +async def start_monitoring_task(): + """백그라운드 모니터링 태스크""" + logger.info("Starting background monitoring task...") + try: + while True: + # 활성 연결이 있을 때만 조회 (리소스 절약) + if manager.active_connections: + resources = await proxmox_service.get_all_vms() + await manager.broadcast(resources) + + await asyncio.sleep(2) # 2초마다 갱신 + except asyncio.CancelledError: + logger.info("Monitoring task cancelled") diff --git a/app/main.py b/app/main.py index f465dff..f4c2958 100644 --- a/app/main.py +++ b/app/main.py @@ -3,7 +3,8 @@ from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager from app.config import settings from app.database import engine, Base -from app.api import auth, vms, tunnel, admin, ssh_credentials +from app.api import auth, vms, tunnel, admin, ssh_credentials, ws +import asyncio import logging # 로깅 설정 @@ -47,9 +48,18 @@ async def lifespan(app: FastAPI): finally: db.close() + # 백그라운드 모니터링 시작 + monitor_task = asyncio.create_task(ws.start_monitoring_task()) + yield # 종료 시 + monitor_task.cancel() + try: + await monitor_task + except asyncio.CancelledError: + pass + logger.info("🛑 VConnect API 서버 종료") # FastAPI 앱 생성 @@ -75,6 +85,7 @@ app.include_router(vms.router, prefix=f"{settings.API_V1_PREFIX}/vms", tags=["VM app.include_router(tunnel.router, prefix=f"{settings.API_V1_PREFIX}/tunnel", tags=["터널 관리"]) app.include_router(admin.router, prefix=f"{settings.API_V1_PREFIX}/admin", tags=["관리자"]) app.include_router(ssh_credentials.router, prefix=f"{settings.API_V1_PREFIX}/ssh", tags=["SSH 자격증명"]) +app.include_router(ws.router, prefix="/ws", tags=["WebSocket"]) # Health Check @app.get("/health") diff --git a/app/schemas/vm.py b/app/schemas/vm.py index d06ebb4..cf99553 100644 --- a/app/schemas/vm.py +++ b/app/schemas/vm.py @@ -6,6 +6,7 @@ from datetime import datetime class VMInfo(BaseModel): vm_id: int node: str + type: str = "qemu" # qemu or lxc name: str status: str ip_address: Optional[str] = None diff --git a/app/services/proxmox_service.py b/app/services/proxmox_service.py index 098726f..1d9752d 100644 --- a/app/services/proxmox_service.py +++ b/app/services/proxmox_service.py @@ -1,8 +1,12 @@ import httpx +import asyncio +import logging from typing import List, Optional, Dict from app.config import settings import json +logger = logging.getLogger(__name__) + class ProxmoxService: """Proxmox VE API 통신 서비스""" @@ -16,10 +20,20 @@ class ProxmoxService: url = f"{self.base_url}/api2/json{endpoint}" headers = {"Authorization": self.api_token} - async with httpx.AsyncClient(verify=self.verify_ssl, timeout=30.0) as client: - response = await client.request(method, url, headers=headers, **kwargs) - response.raise_for_status() - return response.json() + try: + async with httpx.AsyncClient(verify=self.verify_ssl, timeout=30.0) as client: + response = await client.request(method, url, headers=headers, **kwargs) + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as e: + logger.error(f"Proxmox API HTTP Error: {e.response.status_code} - {e.response.text}") + raise + except httpx.RequestError as e: + logger.error(f"Proxmox API Request Error: {e}") + raise + except Exception as e: + logger.error(f"Proxmox API Unexpected Error: {e}") + raise async def get_nodes(self) -> List[Dict]: """노드 목록 조회""" @@ -32,32 +46,33 @@ class ProxmoxService: return result.get("data", []) async def get_all_vms(self) -> List[Dict]: - """모든 노드의 VM 목록 조회""" - nodes = await self.get_nodes() - print(f"DEBUG: 노드 목록: {nodes}") - - all_vms = [] - - for node in nodes: - node_name = node.get("node") - if node_name: - vms = await self.get_vms(node_name) - print(f"DEBUG: {node_name} 노드의 VM 목록: {vms}") - for vm in vms: - vm["node"] = node_name - all_vms.append(vm) - - print(f"DEBUG: 전체 VM 개수: {len(all_vms)}") - print(f"DEBUG: 전체 VM 목록: {all_vms}") - return all_vms + """모든 노드의 VM 및 LXC 목록 조회 (클러스터 리소스 API 사용)""" + try: + # /cluster/resources 호출 (type=vm은 qemu와 lxc 모두 포함) + result = await self._make_request("GET", "/cluster/resources?type=vm") + resources = result.get("data", []) + + logger.info(f"Total resources fetched: {len(resources)}") + return resources + + except Exception as e: + logger.error(f"Failed to get cluster resources: {e}") + return [] - async def get_vm_status(self, node: str, vm_id: int) -> Dict: - """VM 상태 조회""" - result = await self._make_request("GET", f"/nodes/{node}/qemu/{vm_id}/status/current") - return result.get("data", {}) + async def get_vm_status(self, node: str, vm_id: int, vm_type: str = "qemu") -> Dict: + """VM/LXC 상태 조회""" + try: + result = await self._make_request("GET", f"/nodes/{node}/{vm_type}/{vm_id}/status/current") + return result.get("data", {}) + except Exception as e: + logger.error(f"Failed to get status for {vm_type} {vm_id}: {e}") + return {} - async def get_vm_ip(self, node: str, vm_id: int) -> Optional[str]: - """QEMU Guest Agent를 통해 VM IP 주소 조회""" + async def get_vm_ip(self, node: str, vm_id: int, vm_type: str = "qemu") -> Optional[str]: + """QEMU Guest Agent를 통해 VM IP 주소 조회 (LXC는 미지원)""" + if vm_type != "qemu": + return None + try: result = await self._make_request( "GET", @@ -80,31 +95,38 @@ class ProxmoxService: return address return None - except: + except Exception as e: + logger.warning(f"Failed to get VM IP (Node: {node}, VMID: {vm_id}): {e}") return None - async def start_vm(self, node: str, vm_id: int) -> bool: - """VM 시작""" + async def start_vm(self, node: str, vm_id: int, vm_type: str = "qemu") -> bool: + """VM/LXC 시작""" try: - await self._make_request("POST", f"/nodes/{node}/qemu/{vm_id}/status/start") + await self._make_request("POST", f"/nodes/{node}/{vm_type}/{vm_id}/status/start") + logger.info(f"{vm_type} started: {vm_id} (Node: {node})") return True - except: + except Exception as e: + logger.error(f"Failed to start {vm_type} {vm_id}: {e}") return False - async def stop_vm(self, node: str, vm_id: int) -> bool: - """VM 종료""" + async def stop_vm(self, node: str, vm_id: int, vm_type: str = "qemu") -> bool: + """VM/LXC 종료""" try: - await self._make_request("POST", f"/nodes/{node}/qemu/{vm_id}/status/stop") + await self._make_request("POST", f"/nodes/{node}/{vm_type}/{vm_id}/status/stop") + logger.info(f"{vm_type} stopped: {vm_id} (Node: {node})") return True - except: + except Exception as e: + logger.error(f"Failed to stop {vm_type} {vm_id}: {e}") return False - async def reboot_vm(self, node: str, vm_id: int) -> bool: - """VM 재시작""" + async def reboot_vm(self, node: str, vm_id: int, vm_type: str = "qemu") -> bool: + """VM/LXC 재시작""" try: - await self._make_request("POST", f"/nodes/{node}/qemu/{vm_id}/status/reboot") + await self._make_request("POST", f"/nodes/{node}/{vm_type}/{vm_id}/status/reboot") + logger.info(f"{vm_type} rebooted: {vm_id} (Node: {node})") return True - except: + except Exception as e: + logger.error(f"Failed to reboot {vm_type} {vm_id}: {e}") return False def _is_private_ip(self, ip: str) -> bool: