Compare commits

...

2 Commits

Author SHA1 Message Date
unknown
c20bd6a067 services 2025-12-13 08:12:44 +09:00
unknown
b931bd9eb6 Reapply "Test Push"
This reverts commit 1d9e0675e9.
2025-12-12 23:52:19 +09:00
7 changed files with 261 additions and 60 deletions

View File

@@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, HTTPException, status, Query
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from app.database import get_db
@@ -41,6 +41,28 @@ async def get_current_user(
is_active=user.is_active
)
async def get_current_user_ws(token: str = Query(...), db: Session = Depends(get_db)) -> CurrentUser:
"""The WebSocket authentication helper"""
payload = decode_token(token)
if not payload:
raise AuthenticationError("Invalid token")
user_id = payload.get("sub")
if not user_id:
raise AuthenticationError("Invalid token payload")
user = auth_service.get_user_by_id(db, int(user_id))
if not user or not user.is_active:
raise AuthenticationError("User not found or inactive")
return CurrentUser(
id=user.id,
username=user.username,
email=user.email,
role=user.role,
is_active=user.is_active
)
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(user_data: UserRegister, db: Session = Depends(get_db)):
"""

View File

@@ -38,24 +38,29 @@ async def get_my_vms(
vm_id = vm["vmid"]
access = access_map.get(vm_id)
# VMAccess가 있으면 해당 정보 사용, 없으면 기본값
# 권한이 없는 VM은 목록에서 제외
if not access:
continue
# VMAccess가 있으면 해당 정보 사용
vm_info = VMInfo(
vm_id=vm_id,
node=vm["node"],
node=vm.get("node"), # cluster/resources returns 'node'
type=vm.get("type", "qemu"),
name=vm.get("name", "Unknown"),
status=vm.get("status", "unknown"),
ip_address=access.static_ip if access else None, # Static IP 자동 설정
cpus=vm.get("cpus", 0),
ip_address=access.static_ip, # Static IP
cpus=vm.get("maxcpu", 0), # cluster/resources uses maxcpu
memory=vm.get("maxmem", 0) // (1024 * 1024), # bytes to MB
memory_usage=vm.get("mem", 0) // (1024 * 1024) if vm.get("mem") else None,
cpu_usage=vm.get("cpu", 0),
can_start=True,
can_stop=True,
can_reboot=True,
can_connect=True,
rdp_username=access.rdp_username if access else None, # RDP 사용자명
rdp_password=access.rdp_password if access else None, # RDP 비밀번호
rdp_port=access.rdp_port if access else 3389 # RDP 포트
can_start=access.can_start,
can_stop=access.can_stop,
can_reboot=access.can_reboot,
can_connect=access.can_connect,
rdp_username=access.rdp_username,
rdp_password=access.rdp_password,
rdp_port=access.rdp_port or 3389
)
vm_list.append(vm_info)
@@ -65,21 +70,22 @@ async def get_my_vms(
async def get_vm_detail(
vm_id: int,
node: str,
type: str = "qemu",
current_user: CurrentUser = Depends(get_current_user)
):
"""
VM 상세 정보 조회
"""
# VM 상태 조회
status = await proxmox_service.get_vm_status(node, vm_id)
status = await proxmox_service.get_vm_status(node, vm_id, type)
if not status:
raise NotFoundError(f"VM {vm_id}를 찾을 수 없습니다")
# IP 조회 제거 - 연결에 필요하지 않음
return VMDetail(
vm_id=vm_id,
node=node,
type=type,
name=status.get("name", "Unknown"),
status=status.get("status", "unknown"),
ip_address=None, # IP 조회 안 함
@@ -96,10 +102,11 @@ async def get_vm_detail(
async def start_vm(
vm_id: int,
node: str,
type: str = "qemu",
current_user: CurrentUser = Depends(get_current_user)
):
"""VM 시작"""
success = await proxmox_service.start_vm(node, vm_id)
success = await proxmox_service.start_vm(node, vm_id, type)
return VMControlResponse(
success=success,
@@ -112,10 +119,11 @@ async def start_vm(
async def stop_vm(
vm_id: int,
node: str,
type: str = "qemu",
current_user: CurrentUser = Depends(get_current_user)
):
"""VM 종료"""
success = await proxmox_service.stop_vm(node, vm_id)
success = await proxmox_service.stop_vm(node, vm_id, type)
return VMControlResponse(
success=success,
@@ -128,10 +136,11 @@ async def stop_vm(
async def reboot_vm(
vm_id: int,
node: str,
type: str = "qemu",
current_user: CurrentUser = Depends(get_current_user)
):
"""VM 재시작"""
success = await proxmox_service.reboot_vm(node, vm_id)
success = await proxmox_service.reboot_vm(node, vm_id, type)
return VMControlResponse(
success=success,

136
app/api/ws.py Normal file
View File

@@ -0,0 +1,136 @@
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, Query
from typing import List, Dict
import asyncio
import logging
import json
from sqlalchemy.orm import Session
from app.database import SessionLocal
from app.api.auth import get_current_user_ws
from app.schemas.auth import CurrentUser
from app.services.proxmox_service import proxmox_service
from app.models.vm import VMAccess
from app.schemas.vm import VMInfo
router = APIRouter()
logger = logging.getLogger(__name__)
class ConnectionManager:
def __init__(self):
# Active connections: List of (WebSocket, User) tuples
self.active_connections: List[tuple[WebSocket, CurrentUser]] = []
async def connect(self, websocket: WebSocket, user: CurrentUser):
await websocket.accept()
self.active_connections.append((websocket, user))
logger.info(f"WebSocket connected: {user.username}")
def disconnect(self, websocket: WebSocket, user: CurrentUser):
if (websocket, user) in self.active_connections:
self.active_connections.remove((websocket, user))
logger.info(f"WebSocket disconnected: {user.username}")
async def broadcast(self, all_resources: List[Dict]):
"""
모든 연결된 클라이언트에게 권한에 맞는 VM 상태를 전송
"""
# DB 세션을 매번 새로 생성하는 것은 비효율적일 수 있으나,
# Background Task에서 실행되므로 안전하게 처리
try:
db = SessionLocal()
# 모든 활성 VM Access 정보 미리 로딩
all_accesses = db.query(VMAccess).filter(VMAccess.is_active == True).all()
# User ID별 Access Map 생성
user_access_map = {}
for access in all_accesses:
if access.user_id not in user_access_map:
user_access_map[access.user_id] = {}
user_access_map[access.user_id][access.vm_id] = access
for connection, user in self.active_connections:
try:
# 해당 유저의 권한 맵 가져오기
access_map = user_access_map.get(user.id, {})
user_vm_list = []
for res in all_resources:
vm_id = res.get("vmid")
# VMID가 없거나 권한 정보가 없으면 패스
if not vm_id or vm_id not in access_map:
continue
access = access_map[vm_id]
# VMInfo 스키마에 맞춰 데이터 구성
vm_info = {
"vm_id": vm_id,
"node": res.get("node"),
"type": res.get("type", "qemu"),
"name": res.get("name", "Unknown"),
"status": res.get("status", "unknown"),
"ip_address": access.static_ip,
"cpus": res.get("maxcpu", 0),
"memory": res.get("maxmem", 0) // (1024 * 1024),
"memory_usage": res.get("mem", 0) // (1024 * 1024) if res.get("mem") else 0,
"cpu_usage": res.get("cpu", 0),
# 권한 정보
"can_start": access.can_start,
"can_stop": access.can_stop,
"can_reboot": access.can_reboot,
"can_connect": access.can_connect,
# RDP 정보는 보안상 웹소켓 브로드캐스트에서는 제외하거나 필요시 포함
# (여기서는 제외하고 REST API 상세조회 사용 권장하지만, 목록 뷰를 위해 포함)
"rdp_username": access.rdp_username,
"rdp_port": access.rdp_port or 3389
}
user_vm_list.append(vm_info)
# 전송
await connection.send_json({
"type": "update",
"data": user_vm_list
})
except Exception as e:
logger.error(f"Error sending update to {user.username}: {e}")
# 에러 발생 시 연결 끊기 고려?
db.close()
except Exception as e:
logger.error(f"Broadcast error: {e}")
manager = ConnectionManager()
@router.websocket("/status")
async def websocket_endpoint(
websocket: WebSocket,
user: CurrentUser = Depends(get_current_user_ws)
):
try:
await manager.connect(websocket, user)
try:
while True:
# 클라이언트로부터 메시지를 받을 일이 딱히 없지만 연결 유지를 위해 대기
await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket, user)
except Exception:
await websocket.close(code=4001)
async def start_monitoring_task():
"""백그라운드 모니터링 태스크"""
logger.info("Starting background monitoring task...")
try:
while True:
# 활성 연결이 있을 때만 조회 (리소스 절약)
if manager.active_connections:
resources = await proxmox_service.get_all_vms()
await manager.broadcast(resources)
await asyncio.sleep(2) # 2초마다 갱신
except asyncio.CancelledError:
logger.info("Monitoring task cancelled")

View File

@@ -3,7 +3,8 @@ from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from app.config import settings
from app.database import engine, Base
from app.api import auth, vms, tunnel, admin, ssh_credentials
from app.api import auth, vms, tunnel, admin, ssh_credentials, ws
import asyncio
import logging
# 로깅 설정
@@ -47,9 +48,18 @@ async def lifespan(app: FastAPI):
finally:
db.close()
# 백그라운드 모니터링 시작
monitor_task = asyncio.create_task(ws.start_monitoring_task())
yield
# 종료 시
monitor_task.cancel()
try:
await monitor_task
except asyncio.CancelledError:
pass
logger.info("🛑 VConnect API 서버 종료")
# FastAPI 앱 생성
@@ -75,6 +85,7 @@ app.include_router(vms.router, prefix=f"{settings.API_V1_PREFIX}/vms", tags=["VM
app.include_router(tunnel.router, prefix=f"{settings.API_V1_PREFIX}/tunnel", tags=["터널 관리"])
app.include_router(admin.router, prefix=f"{settings.API_V1_PREFIX}/admin", tags=["관리자"])
app.include_router(ssh_credentials.router, prefix=f"{settings.API_V1_PREFIX}/ssh", tags=["SSH 자격증명"])
app.include_router(ws.router, prefix="/ws", tags=["WebSocket"])
# Health Check
@app.get("/health")

View File

@@ -6,6 +6,7 @@ from datetime import datetime
class VMInfo(BaseModel):
vm_id: int
node: str
type: str = "qemu" # qemu or lxc
name: str
status: str
ip_address: Optional[str] = None

View File

@@ -1,8 +1,12 @@
import httpx
import asyncio
import logging
from typing import List, Optional, Dict
from app.config import settings
import json
logger = logging.getLogger(__name__)
class ProxmoxService:
"""Proxmox VE API 통신 서비스"""
@@ -16,10 +20,20 @@ class ProxmoxService:
url = f"{self.base_url}/api2/json{endpoint}"
headers = {"Authorization": self.api_token}
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=30.0) as client:
response = await client.request(method, url, headers=headers, **kwargs)
response.raise_for_status()
return response.json()
try:
async with httpx.AsyncClient(verify=self.verify_ssl, timeout=30.0) as client:
response = await client.request(method, url, headers=headers, **kwargs)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"Proxmox API HTTP Error: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
logger.error(f"Proxmox API Request Error: {e}")
raise
except Exception as e:
logger.error(f"Proxmox API Unexpected Error: {e}")
raise
async def get_nodes(self) -> List[Dict]:
"""노드 목록 조회"""
@@ -32,32 +46,33 @@ class ProxmoxService:
return result.get("data", [])
async def get_all_vms(self) -> List[Dict]:
"""모든 노드의 VM 목록 조회"""
nodes = await self.get_nodes()
print(f"DEBUG: 노드 목록: {nodes}")
all_vms = []
for node in nodes:
node_name = node.get("node")
if node_name:
vms = await self.get_vms(node_name)
print(f"DEBUG: {node_name} 노드의 VM 목록: {vms}")
for vm in vms:
vm["node"] = node_name
all_vms.append(vm)
print(f"DEBUG: 전체 VM 개수: {len(all_vms)}")
print(f"DEBUG: 전체 VM 목록: {all_vms}")
return all_vms
"""모든 노드의 VM 및 LXC 목록 조회 (클러스터 리소스 API 사용)"""
try:
# /cluster/resources 호출 (type=vm은 qemu와 lxc 모두 포함)
result = await self._make_request("GET", "/cluster/resources?type=vm")
resources = result.get("data", [])
logger.info(f"Total resources fetched: {len(resources)}")
return resources
except Exception as e:
logger.error(f"Failed to get cluster resources: {e}")
return []
async def get_vm_status(self, node: str, vm_id: int) -> Dict:
"""VM 상태 조회"""
result = await self._make_request("GET", f"/nodes/{node}/qemu/{vm_id}/status/current")
return result.get("data", {})
async def get_vm_status(self, node: str, vm_id: int, vm_type: str = "qemu") -> Dict:
"""VM/LXC 상태 조회"""
try:
result = await self._make_request("GET", f"/nodes/{node}/{vm_type}/{vm_id}/status/current")
return result.get("data", {})
except Exception as e:
logger.error(f"Failed to get status for {vm_type} {vm_id}: {e}")
return {}
async def get_vm_ip(self, node: str, vm_id: int) -> Optional[str]:
"""QEMU Guest Agent를 통해 VM IP 주소 조회"""
async def get_vm_ip(self, node: str, vm_id: int, vm_type: str = "qemu") -> Optional[str]:
"""QEMU Guest Agent를 통해 VM IP 주소 조회 (LXC는 미지원)"""
if vm_type != "qemu":
return None
try:
result = await self._make_request(
"GET",
@@ -80,31 +95,38 @@ class ProxmoxService:
return address
return None
except:
except Exception as e:
logger.warning(f"Failed to get VM IP (Node: {node}, VMID: {vm_id}): {e}")
return None
async def start_vm(self, node: str, vm_id: int) -> bool:
"""VM 시작"""
async def start_vm(self, node: str, vm_id: int, vm_type: str = "qemu") -> bool:
"""VM/LXC 시작"""
try:
await self._make_request("POST", f"/nodes/{node}/qemu/{vm_id}/status/start")
await self._make_request("POST", f"/nodes/{node}/{vm_type}/{vm_id}/status/start")
logger.info(f"{vm_type} started: {vm_id} (Node: {node})")
return True
except:
except Exception as e:
logger.error(f"Failed to start {vm_type} {vm_id}: {e}")
return False
async def stop_vm(self, node: str, vm_id: int) -> bool:
"""VM 종료"""
async def stop_vm(self, node: str, vm_id: int, vm_type: str = "qemu") -> bool:
"""VM/LXC 종료"""
try:
await self._make_request("POST", f"/nodes/{node}/qemu/{vm_id}/status/stop")
await self._make_request("POST", f"/nodes/{node}/{vm_type}/{vm_id}/status/stop")
logger.info(f"{vm_type} stopped: {vm_id} (Node: {node})")
return True
except:
except Exception as e:
logger.error(f"Failed to stop {vm_type} {vm_id}: {e}")
return False
async def reboot_vm(self, node: str, vm_id: int) -> bool:
"""VM 재시작"""
async def reboot_vm(self, node: str, vm_id: int, vm_type: str = "qemu") -> bool:
"""VM/LXC 재시작"""
try:
await self._make_request("POST", f"/nodes/{node}/qemu/{vm_id}/status/reboot")
await self._make_request("POST", f"/nodes/{node}/{vm_type}/{vm_id}/status/reboot")
logger.info(f"{vm_type} rebooted: {vm_id} (Node: {node})")
return True
except:
except Exception as e:
logger.error(f"Failed to reboot {vm_type} {vm_id}: {e}")
return False
def _is_private_ip(self, ip: str) -> bool:

View File

@@ -5,7 +5,7 @@ services:
build: .
container_name: vconnect-api
ports:
- "8000:8000"
- "9000:9000"
env_file:
- .env
restart: unless-stopped