Compare commits
2 Commits
1d9e0675e9
...
c20bd6a067
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c20bd6a067 | ||
|
|
b931bd9eb6 |
@@ -1,4 +1,4 @@
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi import APIRouter, Depends, HTTPException, status, Query
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
from sqlalchemy.orm import Session
|
||||
from app.database import get_db
|
||||
@@ -41,6 +41,28 @@ async def get_current_user(
|
||||
is_active=user.is_active
|
||||
)
|
||||
|
||||
async def get_current_user_ws(token: str = Query(...), db: Session = Depends(get_db)) -> CurrentUser:
|
||||
"""The WebSocket authentication helper"""
|
||||
payload = decode_token(token)
|
||||
if not payload:
|
||||
raise AuthenticationError("Invalid token")
|
||||
|
||||
user_id = payload.get("sub")
|
||||
if not user_id:
|
||||
raise AuthenticationError("Invalid token payload")
|
||||
|
||||
user = auth_service.get_user_by_id(db, int(user_id))
|
||||
if not user or not user.is_active:
|
||||
raise AuthenticationError("User not found or inactive")
|
||||
|
||||
return CurrentUser(
|
||||
id=user.id,
|
||||
username=user.username,
|
||||
email=user.email,
|
||||
role=user.role,
|
||||
is_active=user.is_active
|
||||
)
|
||||
|
||||
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
|
||||
async def register(user_data: UserRegister, db: Session = Depends(get_db)):
|
||||
"""
|
||||
|
||||
@@ -38,24 +38,29 @@ async def get_my_vms(
|
||||
vm_id = vm["vmid"]
|
||||
access = access_map.get(vm_id)
|
||||
|
||||
# VMAccess가 있으면 해당 정보 사용, 없으면 기본값
|
||||
# 권한이 없는 VM은 목록에서 제외
|
||||
if not access:
|
||||
continue
|
||||
|
||||
# VMAccess가 있으면 해당 정보 사용
|
||||
vm_info = VMInfo(
|
||||
vm_id=vm_id,
|
||||
node=vm["node"],
|
||||
node=vm.get("node"), # cluster/resources returns 'node'
|
||||
type=vm.get("type", "qemu"),
|
||||
name=vm.get("name", "Unknown"),
|
||||
status=vm.get("status", "unknown"),
|
||||
ip_address=access.static_ip if access else None, # Static IP 자동 설정
|
||||
cpus=vm.get("cpus", 0),
|
||||
ip_address=access.static_ip, # Static IP
|
||||
cpus=vm.get("maxcpu", 0), # cluster/resources uses maxcpu
|
||||
memory=vm.get("maxmem", 0) // (1024 * 1024), # bytes to MB
|
||||
memory_usage=vm.get("mem", 0) // (1024 * 1024) if vm.get("mem") else None,
|
||||
cpu_usage=vm.get("cpu", 0),
|
||||
can_start=True,
|
||||
can_stop=True,
|
||||
can_reboot=True,
|
||||
can_connect=True,
|
||||
rdp_username=access.rdp_username if access else None, # RDP 사용자명
|
||||
rdp_password=access.rdp_password if access else None, # RDP 비밀번호
|
||||
rdp_port=access.rdp_port if access else 3389 # RDP 포트
|
||||
can_start=access.can_start,
|
||||
can_stop=access.can_stop,
|
||||
can_reboot=access.can_reboot,
|
||||
can_connect=access.can_connect,
|
||||
rdp_username=access.rdp_username,
|
||||
rdp_password=access.rdp_password,
|
||||
rdp_port=access.rdp_port or 3389
|
||||
)
|
||||
vm_list.append(vm_info)
|
||||
|
||||
@@ -65,21 +70,22 @@ async def get_my_vms(
|
||||
async def get_vm_detail(
|
||||
vm_id: int,
|
||||
node: str,
|
||||
type: str = "qemu",
|
||||
current_user: CurrentUser = Depends(get_current_user)
|
||||
):
|
||||
"""
|
||||
VM 상세 정보 조회
|
||||
"""
|
||||
# VM 상태 조회
|
||||
status = await proxmox_service.get_vm_status(node, vm_id)
|
||||
status = await proxmox_service.get_vm_status(node, vm_id, type)
|
||||
|
||||
if not status:
|
||||
raise NotFoundError(f"VM {vm_id}를 찾을 수 없습니다")
|
||||
|
||||
# IP 조회 제거 - 연결에 필요하지 않음
|
||||
return VMDetail(
|
||||
vm_id=vm_id,
|
||||
node=node,
|
||||
type=type,
|
||||
name=status.get("name", "Unknown"),
|
||||
status=status.get("status", "unknown"),
|
||||
ip_address=None, # IP 조회 안 함
|
||||
@@ -96,10 +102,11 @@ async def get_vm_detail(
|
||||
async def start_vm(
|
||||
vm_id: int,
|
||||
node: str,
|
||||
type: str = "qemu",
|
||||
current_user: CurrentUser = Depends(get_current_user)
|
||||
):
|
||||
"""VM 시작"""
|
||||
success = await proxmox_service.start_vm(node, vm_id)
|
||||
success = await proxmox_service.start_vm(node, vm_id, type)
|
||||
|
||||
return VMControlResponse(
|
||||
success=success,
|
||||
@@ -112,10 +119,11 @@ async def start_vm(
|
||||
async def stop_vm(
|
||||
vm_id: int,
|
||||
node: str,
|
||||
type: str = "qemu",
|
||||
current_user: CurrentUser = Depends(get_current_user)
|
||||
):
|
||||
"""VM 종료"""
|
||||
success = await proxmox_service.stop_vm(node, vm_id)
|
||||
success = await proxmox_service.stop_vm(node, vm_id, type)
|
||||
|
||||
return VMControlResponse(
|
||||
success=success,
|
||||
@@ -128,10 +136,11 @@ async def stop_vm(
|
||||
async def reboot_vm(
|
||||
vm_id: int,
|
||||
node: str,
|
||||
type: str = "qemu",
|
||||
current_user: CurrentUser = Depends(get_current_user)
|
||||
):
|
||||
"""VM 재시작"""
|
||||
success = await proxmox_service.reboot_vm(node, vm_id)
|
||||
success = await proxmox_service.reboot_vm(node, vm_id, type)
|
||||
|
||||
return VMControlResponse(
|
||||
success=success,
|
||||
|
||||
136
app/api/ws.py
Normal file
136
app/api/ws.py
Normal file
@@ -0,0 +1,136 @@
|
||||
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, Query
|
||||
from typing import List, Dict
|
||||
import asyncio
|
||||
import logging
|
||||
import json
|
||||
from sqlalchemy.orm import Session
|
||||
from app.database import SessionLocal
|
||||
from app.api.auth import get_current_user_ws
|
||||
from app.schemas.auth import CurrentUser
|
||||
from app.services.proxmox_service import proxmox_service
|
||||
from app.models.vm import VMAccess
|
||||
from app.schemas.vm import VMInfo
|
||||
|
||||
router = APIRouter()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class ConnectionManager:
|
||||
def __init__(self):
|
||||
# Active connections: List of (WebSocket, User) tuples
|
||||
self.active_connections: List[tuple[WebSocket, CurrentUser]] = []
|
||||
|
||||
async def connect(self, websocket: WebSocket, user: CurrentUser):
|
||||
await websocket.accept()
|
||||
self.active_connections.append((websocket, user))
|
||||
logger.info(f"WebSocket connected: {user.username}")
|
||||
|
||||
def disconnect(self, websocket: WebSocket, user: CurrentUser):
|
||||
if (websocket, user) in self.active_connections:
|
||||
self.active_connections.remove((websocket, user))
|
||||
logger.info(f"WebSocket disconnected: {user.username}")
|
||||
|
||||
async def broadcast(self, all_resources: List[Dict]):
|
||||
"""
|
||||
모든 연결된 클라이언트에게 권한에 맞는 VM 상태를 전송
|
||||
"""
|
||||
# DB 세션을 매번 새로 생성하는 것은 비효율적일 수 있으나,
|
||||
# Background Task에서 실행되므로 안전하게 처리
|
||||
try:
|
||||
db = SessionLocal()
|
||||
|
||||
# 모든 활성 VM Access 정보 미리 로딩
|
||||
all_accesses = db.query(VMAccess).filter(VMAccess.is_active == True).all()
|
||||
|
||||
# User ID별 Access Map 생성
|
||||
user_access_map = {}
|
||||
for access in all_accesses:
|
||||
if access.user_id not in user_access_map:
|
||||
user_access_map[access.user_id] = {}
|
||||
user_access_map[access.user_id][access.vm_id] = access
|
||||
|
||||
for connection, user in self.active_connections:
|
||||
try:
|
||||
# 해당 유저의 권한 맵 가져오기
|
||||
access_map = user_access_map.get(user.id, {})
|
||||
|
||||
user_vm_list = []
|
||||
for res in all_resources:
|
||||
vm_id = res.get("vmid")
|
||||
# VMID가 없거나 권한 정보가 없으면 패스
|
||||
if not vm_id or vm_id not in access_map:
|
||||
continue
|
||||
|
||||
access = access_map[vm_id]
|
||||
|
||||
# VMInfo 스키마에 맞춰 데이터 구성
|
||||
vm_info = {
|
||||
"vm_id": vm_id,
|
||||
"node": res.get("node"),
|
||||
"type": res.get("type", "qemu"),
|
||||
"name": res.get("name", "Unknown"),
|
||||
"status": res.get("status", "unknown"),
|
||||
"ip_address": access.static_ip,
|
||||
"cpus": res.get("maxcpu", 0),
|
||||
"memory": res.get("maxmem", 0) // (1024 * 1024),
|
||||
"memory_usage": res.get("mem", 0) // (1024 * 1024) if res.get("mem") else 0,
|
||||
"cpu_usage": res.get("cpu", 0),
|
||||
# 권한 정보
|
||||
"can_start": access.can_start,
|
||||
"can_stop": access.can_stop,
|
||||
"can_reboot": access.can_reboot,
|
||||
"can_connect": access.can_connect,
|
||||
# RDP 정보는 보안상 웹소켓 브로드캐스트에서는 제외하거나 필요시 포함
|
||||
# (여기서는 제외하고 REST API 상세조회 사용 권장하지만, 목록 뷰를 위해 포함)
|
||||
"rdp_username": access.rdp_username,
|
||||
"rdp_port": access.rdp_port or 3389
|
||||
}
|
||||
user_vm_list.append(vm_info)
|
||||
|
||||
# 전송
|
||||
await connection.send_json({
|
||||
"type": "update",
|
||||
"data": user_vm_list
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending update to {user.username}: {e}")
|
||||
# 에러 발생 시 연결 끊기 고려?
|
||||
|
||||
db.close()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Broadcast error: {e}")
|
||||
|
||||
manager = ConnectionManager()
|
||||
|
||||
@router.websocket("/status")
|
||||
async def websocket_endpoint(
|
||||
websocket: WebSocket,
|
||||
user: CurrentUser = Depends(get_current_user_ws)
|
||||
):
|
||||
try:
|
||||
await manager.connect(websocket, user)
|
||||
|
||||
try:
|
||||
while True:
|
||||
# 클라이언트로부터 메시지를 받을 일이 딱히 없지만 연결 유지를 위해 대기
|
||||
await websocket.receive_text()
|
||||
except WebSocketDisconnect:
|
||||
manager.disconnect(websocket, user)
|
||||
|
||||
except Exception:
|
||||
await websocket.close(code=4001)
|
||||
|
||||
async def start_monitoring_task():
|
||||
"""백그라운드 모니터링 태스크"""
|
||||
logger.info("Starting background monitoring task...")
|
||||
try:
|
||||
while True:
|
||||
# 활성 연결이 있을 때만 조회 (리소스 절약)
|
||||
if manager.active_connections:
|
||||
resources = await proxmox_service.get_all_vms()
|
||||
await manager.broadcast(resources)
|
||||
|
||||
await asyncio.sleep(2) # 2초마다 갱신
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Monitoring task cancelled")
|
||||
13
app/main.py
13
app/main.py
@@ -3,7 +3,8 @@ from fastapi.middleware.cors import CORSMiddleware
|
||||
from contextlib import asynccontextmanager
|
||||
from app.config import settings
|
||||
from app.database import engine, Base
|
||||
from app.api import auth, vms, tunnel, admin, ssh_credentials
|
||||
from app.api import auth, vms, tunnel, admin, ssh_credentials, ws
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
# 로깅 설정
|
||||
@@ -47,9 +48,18 @@ async def lifespan(app: FastAPI):
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
# 백그라운드 모니터링 시작
|
||||
monitor_task = asyncio.create_task(ws.start_monitoring_task())
|
||||
|
||||
yield
|
||||
|
||||
# 종료 시
|
||||
monitor_task.cancel()
|
||||
try:
|
||||
await monitor_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
logger.info("🛑 VConnect API 서버 종료")
|
||||
|
||||
# FastAPI 앱 생성
|
||||
@@ -75,6 +85,7 @@ app.include_router(vms.router, prefix=f"{settings.API_V1_PREFIX}/vms", tags=["VM
|
||||
app.include_router(tunnel.router, prefix=f"{settings.API_V1_PREFIX}/tunnel", tags=["터널 관리"])
|
||||
app.include_router(admin.router, prefix=f"{settings.API_V1_PREFIX}/admin", tags=["관리자"])
|
||||
app.include_router(ssh_credentials.router, prefix=f"{settings.API_V1_PREFIX}/ssh", tags=["SSH 자격증명"])
|
||||
app.include_router(ws.router, prefix="/ws", tags=["WebSocket"])
|
||||
|
||||
# Health Check
|
||||
@app.get("/health")
|
||||
|
||||
@@ -6,6 +6,7 @@ from datetime import datetime
|
||||
class VMInfo(BaseModel):
|
||||
vm_id: int
|
||||
node: str
|
||||
type: str = "qemu" # qemu or lxc
|
||||
name: str
|
||||
status: str
|
||||
ip_address: Optional[str] = None
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
import httpx
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import List, Optional, Dict
|
||||
from app.config import settings
|
||||
import json
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class ProxmoxService:
|
||||
"""Proxmox VE API 통신 서비스"""
|
||||
|
||||
@@ -16,10 +20,20 @@ class ProxmoxService:
|
||||
url = f"{self.base_url}/api2/json{endpoint}"
|
||||
headers = {"Authorization": self.api_token}
|
||||
|
||||
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=30.0) as client:
|
||||
response = await client.request(method, url, headers=headers, **kwargs)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
try:
|
||||
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=30.0) as client:
|
||||
response = await client.request(method, url, headers=headers, **kwargs)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except httpx.HTTPStatusError as e:
|
||||
logger.error(f"Proxmox API HTTP Error: {e.response.status_code} - {e.response.text}")
|
||||
raise
|
||||
except httpx.RequestError as e:
|
||||
logger.error(f"Proxmox API Request Error: {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Proxmox API Unexpected Error: {e}")
|
||||
raise
|
||||
|
||||
async def get_nodes(self) -> List[Dict]:
|
||||
"""노드 목록 조회"""
|
||||
@@ -32,32 +46,33 @@ class ProxmoxService:
|
||||
return result.get("data", [])
|
||||
|
||||
async def get_all_vms(self) -> List[Dict]:
|
||||
"""모든 노드의 VM 목록 조회"""
|
||||
nodes = await self.get_nodes()
|
||||
print(f"DEBUG: 노드 목록: {nodes}")
|
||||
|
||||
all_vms = []
|
||||
|
||||
for node in nodes:
|
||||
node_name = node.get("node")
|
||||
if node_name:
|
||||
vms = await self.get_vms(node_name)
|
||||
print(f"DEBUG: {node_name} 노드의 VM 목록: {vms}")
|
||||
for vm in vms:
|
||||
vm["node"] = node_name
|
||||
all_vms.append(vm)
|
||||
|
||||
print(f"DEBUG: 전체 VM 개수: {len(all_vms)}")
|
||||
print(f"DEBUG: 전체 VM 목록: {all_vms}")
|
||||
return all_vms
|
||||
"""모든 노드의 VM 및 LXC 목록 조회 (클러스터 리소스 API 사용)"""
|
||||
try:
|
||||
# /cluster/resources 호출 (type=vm은 qemu와 lxc 모두 포함)
|
||||
result = await self._make_request("GET", "/cluster/resources?type=vm")
|
||||
resources = result.get("data", [])
|
||||
|
||||
logger.info(f"Total resources fetched: {len(resources)}")
|
||||
return resources
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get cluster resources: {e}")
|
||||
return []
|
||||
|
||||
async def get_vm_status(self, node: str, vm_id: int) -> Dict:
|
||||
"""VM 상태 조회"""
|
||||
result = await self._make_request("GET", f"/nodes/{node}/qemu/{vm_id}/status/current")
|
||||
return result.get("data", {})
|
||||
async def get_vm_status(self, node: str, vm_id: int, vm_type: str = "qemu") -> Dict:
|
||||
"""VM/LXC 상태 조회"""
|
||||
try:
|
||||
result = await self._make_request("GET", f"/nodes/{node}/{vm_type}/{vm_id}/status/current")
|
||||
return result.get("data", {})
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get status for {vm_type} {vm_id}: {e}")
|
||||
return {}
|
||||
|
||||
async def get_vm_ip(self, node: str, vm_id: int) -> Optional[str]:
|
||||
"""QEMU Guest Agent를 통해 VM IP 주소 조회"""
|
||||
async def get_vm_ip(self, node: str, vm_id: int, vm_type: str = "qemu") -> Optional[str]:
|
||||
"""QEMU Guest Agent를 통해 VM IP 주소 조회 (LXC는 미지원)"""
|
||||
if vm_type != "qemu":
|
||||
return None
|
||||
|
||||
try:
|
||||
result = await self._make_request(
|
||||
"GET",
|
||||
@@ -80,31 +95,38 @@ class ProxmoxService:
|
||||
return address
|
||||
|
||||
return None
|
||||
except:
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get VM IP (Node: {node}, VMID: {vm_id}): {e}")
|
||||
return None
|
||||
|
||||
async def start_vm(self, node: str, vm_id: int) -> bool:
|
||||
"""VM 시작"""
|
||||
async def start_vm(self, node: str, vm_id: int, vm_type: str = "qemu") -> bool:
|
||||
"""VM/LXC 시작"""
|
||||
try:
|
||||
await self._make_request("POST", f"/nodes/{node}/qemu/{vm_id}/status/start")
|
||||
await self._make_request("POST", f"/nodes/{node}/{vm_type}/{vm_id}/status/start")
|
||||
logger.info(f"{vm_type} started: {vm_id} (Node: {node})")
|
||||
return True
|
||||
except:
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start {vm_type} {vm_id}: {e}")
|
||||
return False
|
||||
|
||||
async def stop_vm(self, node: str, vm_id: int) -> bool:
|
||||
"""VM 종료"""
|
||||
async def stop_vm(self, node: str, vm_id: int, vm_type: str = "qemu") -> bool:
|
||||
"""VM/LXC 종료"""
|
||||
try:
|
||||
await self._make_request("POST", f"/nodes/{node}/qemu/{vm_id}/status/stop")
|
||||
await self._make_request("POST", f"/nodes/{node}/{vm_type}/{vm_id}/status/stop")
|
||||
logger.info(f"{vm_type} stopped: {vm_id} (Node: {node})")
|
||||
return True
|
||||
except:
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to stop {vm_type} {vm_id}: {e}")
|
||||
return False
|
||||
|
||||
async def reboot_vm(self, node: str, vm_id: int) -> bool:
|
||||
"""VM 재시작"""
|
||||
async def reboot_vm(self, node: str, vm_id: int, vm_type: str = "qemu") -> bool:
|
||||
"""VM/LXC 재시작"""
|
||||
try:
|
||||
await self._make_request("POST", f"/nodes/{node}/qemu/{vm_id}/status/reboot")
|
||||
await self._make_request("POST", f"/nodes/{node}/{vm_type}/{vm_id}/status/reboot")
|
||||
logger.info(f"{vm_type} rebooted: {vm_id} (Node: {node})")
|
||||
return True
|
||||
except:
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to reboot {vm_type} {vm_id}: {e}")
|
||||
return False
|
||||
|
||||
def _is_private_ip(self, ip: str) -> bool:
|
||||
|
||||
@@ -5,7 +5,7 @@ services:
|
||||
build: .
|
||||
container_name: vconnect-api
|
||||
ports:
|
||||
- "8000:8000"
|
||||
- "9000:9000"
|
||||
env_file:
|
||||
- .env
|
||||
restart: unless-stopped
|
||||
|
||||
Reference in New Issue
Block a user