Compare commits

...

16 Commits

Author SHA1 Message Date
unknown
9d5d2b8d99 Update 2025-12-19 20:23:59 2025-12-19 20:23:59 +09:00
unknown
b37c43ab86 Update 2025-12-19 19:18:16 2025-12-19 19:18:16 +09:00
unknown
b18412ecb2 Update 2025-12-19 16:23:03 2025-12-19 16:23:03 +09:00
804204ab97 update 2025-11-29 16:50:48 +09:00
25cbb6b8f8 update 2025-11-29 16:30:27 +09:00
2e4fc20523 Merge branch 'main' of https://gitea.mouse84.com/Kim.KANGHEE/iDRAC_Info 2025-11-29 11:14:52 +09:00
19798cca66 update 2025-11-29 11:13:55 +09:00
c0d3312bca update 2025-11-28 18:27:15 +09:00
45fa1fa162 Update config.py 2025-11-05 21:19:23 +09:00
2481d44eb8 update 2025-11-05 21:18:29 +09:00
bc15452181 update 2025-10-21 20:29:39 +09:00
230ea0890d update 2025-10-16 15:06:50 +09:00
2fcca115d6 update 2025-10-13 22:26:27 +09:00
a79e61d7e4 update 2025-10-13 22:18:52 +09:00
e1bacec556 update 2025-10-13 22:09:21 +09:00
90c242e46b update 2025-10-13 22:08:53 +09:00
265 changed files with 37333 additions and 17604 deletions

6
.env
View File

@@ -19,4 +19,8 @@ SOCKETIO_ASYNC_MODE=threading
# Telegram (민감정보, 필수 시에만 설정)
TELEGRAM_BOT_TOKEN=6719918880:AAHC1on-KlzH0G3ylJP57p-q5qMyorFUGZo
TELEGRAM_CHAT_ID=298120612
TELEGRAM_CHAT_ID=298120612
# iDRAC 기본 연결 정보 (추가!)
IDRAC_DEFAULT_USERNAME=root
IDRAC_DEFAULT_PASSWORD=calvin

915
README.md

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

87
app.py
View File

@@ -8,6 +8,7 @@ from flask_login import LoginManager
from flask_migrate import Migrate
from flask_socketio import SocketIO
from flask_wtf import CSRFProtect
from dotenv import load_dotenv
from config import Config
from backend.models.user import db, load_user
@@ -15,10 +16,19 @@ from backend.routes import register_routes
from backend.services.logger import setup_logging
from backend.services import watchdog_handler
# 텔레그램 서비스 (별도 모듈)에서 가져옴
from telegram_bot_service import run_polling as telegram_run_polling
# ─────────────────────────────────────────────────────────────
# .env 파일 로드 (환경변수 우선순위 보장)
# ─────────────────────────────────────────────────────────────
load_dotenv()
# ─────────────────────────────────────────────────────────────
# 템플릿/정적 경로를 파일 위치 기준으로 안전하게 설정
# structure: <project_root>/backend/templates, <project_root>/backend/static
# ─────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).resolve().parent
TEMPLATE_DIR = (BASE_DIR / "backend" / "templates").resolve()
STATIC_DIR = (BASE_DIR / "backend" / "static").resolve()
@@ -32,9 +42,18 @@ setup_logging(app)
# ─────────────────────────────────────────────────────────────
# CSRF 보호 + 템플릿에서 {{ csrf_token() }} 사용 가능하게 주입
# ─────────────────────────────────────────────────────────────
csrf = CSRFProtect()
csrf.init_app(app)
# ─────────────────────────────────────────────────────────────
# ProxyFix: Nginx/NPM 등 리버스 프록시 뒤에서 실행 시 헤더 신뢰
# (HTTPS 인식, 올바른 IP/Scheme 파악으로 CSRF/세션 문제 해결)
# ─────────────────────────────────────────────────────────────
from werkzeug.middleware.proxy_fix import ProxyFix
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
@app.context_processor
def inject_csrf():
try:
@@ -44,9 +63,11 @@ def inject_csrf():
# Flask-WTF 미설치/에러 시에도 앱이 뜨도록 방어
return dict(csrf_token=lambda: "")
# ─────────────────────────────────────────────────────────────
# SocketIO: Windows 기본 threading, Linux는 eventlet 설치 시 eventlet 사용
# 환경변수 SOCKETIO_ASYNC_MODE 로 강제 지정 가능 ("threading"/"eventlet"/"gevent"/"auto")
# ─────────────────────────────────────────────────────────────
async_mode = (app.config.get("SOCKETIO_ASYNC_MODE") or "threading").lower()
if async_mode == "auto":
async_mode = "threading"
@@ -67,39 +88,101 @@ socketio = SocketIO(app, cors_allowed_origins="*", async_mode=async_mode)
# watchdog에서 socketio 사용
watchdog_handler.socketio = socketio
# ─────────────────────────────────────────────────────────────
# DB / 마이그레이션
# ─────────────────────────────────────────────────────────────
app.logger.info("DB URI = %s", app.config.get("SQLALCHEMY_DATABASE_URI"))
db.init_app(app)
Migrate(app, db)
# (선택) 개발 편의용: 테이블 자동 부트스트랩
# 환경변수 AUTO_BOOTSTRAP_DB=true 일 때만 동작 (운영에서는 flask db upgrade 사용 권장)
if (os.getenv("AUTO_BOOTSTRAP_DB", "false").lower() == "true"):
if os.getenv("AUTO_BOOTSTRAP_DB", "false").lower() == "true":
from sqlalchemy import inspect
with app.app_context():
insp = inspect(db.engine)
if "user" not in insp.get_table_names():
db.create_all()
app.logger.info("DB bootstrap: created tables via create_all()")
# ─────────────────────────────────────────────────────────────
# Login
# ─────────────────────────────────────────────────────────────
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "auth.login"
@login_manager.user_loader
def _load_user(user_id: str):
return load_user(user_id)
# 라우트 등록 (Blueprints 등)
register_routes(app, socketio)
# ─────────────────────────────────────────────────────────────
# 텔레그램 봇 폴링 서비스 (중복 실행 방지 포함)
# ─────────────────────────────────────────────────────────────
_bot_socket_lock = None
def start_telegram_bot_polling() -> None:
"""텔레그램 봇 폴링을 백그라운드 스레드로 시작 (TCP 소켓 락으로 중복 방지)"""
import threading
import socket
global _bot_socket_lock
if _bot_socket_lock:
return
app.logger.info("🔒 봇 중복 실행 방지 락(TCP:50000) 획득 시도...")
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("127.0.0.1", 50000))
s.listen(1)
_bot_socket_lock = s
app.logger.info("🔒 락 획득 성공! 봇 폴링 스레드를 시작합니다.")
except OSError:
app.logger.warning("⛔ 락 획득 실패: 이미 다른 프로세스(또는 좀비 프로세스)가 포트 50000을 점유 중입니다. 봇 폴링을 건너뜁니다.")
return
def _runner():
try:
telegram_run_polling(app)
except Exception as e:
app.logger.error("텔레그램 봇 폴링 서비스 오류: %s", e)
polling_thread = threading.Thread(target=_runner, daemon=True)
polling_thread.start()
# ─────────────────────────────────────────────────────────────
# 텔레그램 봇 폴링 자동 시작
# Flask 앱이 초기화되면 자동으로 봇 폴링 시작
# 주의: Flask 리로더(Debug 모드) 사용 시 메인/워커 프로세스 중복 실행 방지
# ─────────────────────────────────────────────────────────────
# 1. 리로더의 워커 프로세스인 경우 (WERKZEUG_RUN_MAIN = "true")
# 2. 또는 디버그 모드가 꺼진 경우 (Production)
if os.environ.get("WERKZEUG_RUN_MAIN") == "true" or not app.config.get("DEBUG"):
start_telegram_bot_polling()
# ─────────────────────────────────────────────────────────────
# 엔트리포인트
# ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
host = os.getenv("FLASK_HOST", "0.0.0.0")
port = int(os.getenv("FLASK_PORT", 5000))
debug = os.getenv("FLASK_DEBUG", "true").lower() == "true"
socketio.run(app, host=host, port=port, debug=debug)
# python app.py로 직접 실행 시(use_reloader=False)에는 위 조건문에서 실행되지 않을 수 있으므로
# 여기서 명시적으로 실행 (중복 실행 방지 플래그가 있어 안전함)
start_telegram_bot_polling()
socketio.run(app, host=host, port=port, debug=debug, allow_unsafe_werkzeug=True, use_reloader=False)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,134 @@
"""
펌웨어 버전 관리 모델
backend/models/firmware_version.py
"""
from backend.models.user import db
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text
class FirmwareVersion(db.Model):
"""펌웨어 최신 버전 정보 모델"""
__tablename__ = 'firmware_versions'
id = Column(Integer, primary_key=True)
# 컴포넌트 정보
component_name = Column(String(100), nullable=False) # BIOS, iDRAC, PERC 등
component_type = Column(String(50)) # Firmware, Driver 등
vendor = Column(String(50)) # Dell, Intel, Broadcom 등
# 서버 모델 (선택적)
server_model = Column(String(100)) # PowerEdge R750, R640 등 (비어있으면 모든 모델)
# 버전 정보
latest_version = Column(String(50), nullable=False) # 최신 버전
release_date = Column(String(20)) # 릴리즈 날짜
# 다운로드 정보
download_url = Column(String(500)) # DUP 파일 다운로드 URL
file_name = Column(String(200)) # DUP 파일명
file_size_mb = Column(Integer) # 파일 크기 (MB)
# 메타 정보
notes = Column(Text) # 비고 (버전 변경 사항 등)
is_critical = Column(Boolean, default=False) # 중요 업데이트 여부
is_active = Column(Boolean, default=True) # 활성화 여부
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self):
"""딕셔너리로 변환"""
return {
'id': self.id,
'component_name': self.component_name,
'component_type': self.component_type,
'vendor': self.vendor,
'server_model': self.server_model,
'latest_version': self.latest_version,
'release_date': self.release_date,
'download_url': self.download_url,
'file_name': self.file_name,
'file_size_mb': self.file_size_mb,
'notes': self.notes,
'is_critical': self.is_critical,
'is_active': self.is_active,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}
def __repr__(self):
return f'<FirmwareVersion {self.component_name} {self.latest_version}>'
class FirmwareComparisonResult:
"""펌웨어 버전 비교 결과"""
def __init__(self, component_name, current_version, latest_version=None):
self.component_name = component_name
self.current_version = current_version
self.latest_version = latest_version
self.status = self._compare_versions()
self.recommendation = self._get_recommendation()
def _compare_versions(self):
"""버전 비교"""
if not self.latest_version:
return 'unknown' # 최신 버전 정보 없음
if self.current_version == self.latest_version:
return 'latest' # 최신
# 버전 비교 (단순 문자열 비교보다 정교하게)
if self._is_older(self.current_version, self.latest_version):
return 'outdated' # 업데이트 필요
else:
return 'unknown' # 판단 불가
def _is_older(self, current, latest):
"""
버전 비교 (간단한 버전 비교)
예: 2.10.0 < 2.15.0
"""
try:
# 버전 문자열을 숫자 리스트로 변환
current_parts = [int(x) for x in current.replace('-', '.').split('.') if x.isdigit()]
latest_parts = [int(x) for x in latest.replace('-', '.').split('.') if x.isdigit()]
# 길이 맞추기
max_len = max(len(current_parts), len(latest_parts))
current_parts += [0] * (max_len - len(current_parts))
latest_parts += [0] * (max_len - len(latest_parts))
# 비교
for c, l in zip(current_parts, latest_parts):
if c < l:
return True
elif c > l:
return False
return False # 같음
except:
# 비교 실패 시 문자열 비교
return current < latest
def _get_recommendation(self):
"""권장 사항"""
if self.status == 'outdated':
return f'업데이트 권장: {self.current_version}{self.latest_version}'
elif self.status == 'latest':
return '최신 버전'
else:
return '버전 정보 확인 필요'
def to_dict(self):
return {
'component_name': self.component_name,
'current_version': self.current_version,
'latest_version': self.latest_version,
'status': self.status,
'recommendation': self.recommendation
}

View File

@@ -0,0 +1,62 @@
"""
Dell iDRAC 서버 관리 모델
backend/models/idrac_server.py
"""
from backend.models.user import db
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, Boolean
class IdracServer(db.Model):
"""iDRAC 서버 정보 모델"""
__tablename__ = 'idrac_servers'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False) # 서버명
ip_address = Column(String(50), nullable=False, unique=True) # iDRAC IP
username = Column(String(50), default='root') # iDRAC 사용자명
password = Column(String(200), nullable=False) # 비밀번호 (암호화 권장)
# 분류 정보
group_name = Column(String(100)) # 그룹명 (고객사, 프로젝트명)
location = Column(String(100)) # 위치
model = Column(String(100)) # 서버 모델
# 상태 정보
status = Column(String(20), default='registered') # registered, online, offline, updating
last_connected = Column(DateTime) # 마지막 연결 시간
last_updated = Column(DateTime) # 마지막 업데이트 시간
# 펌웨어 정보
current_bios = Column(String(50)) # 현재 BIOS 버전
target_bios = Column(String(50)) # 목표 BIOS 버전
# 메타 정보
notes = Column(String(500)) # 비고
is_active = Column(Boolean, default=True) # 활성화 여부
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self):
"""딕셔너리로 변환"""
return {
'id': self.id,
'name': self.name,
'ip_address': self.ip_address,
'username': self.username,
'group_name': self.group_name,
'location': self.location,
'model': self.model,
'status': self.status,
'last_connected': self.last_connected.isoformat() if self.last_connected else None,
'last_updated': self.last_updated.isoformat() if self.last_updated else None,
'current_bios': self.current_bios,
'target_bios': self.target_bios,
'notes': self.notes,
'is_active': self.is_active,
'created_at': self.created_at.isoformat() if self.created_at else None
}
def __repr__(self):
return f'<IdracServer {self.name} ({self.ip_address})>'

View File

@@ -0,0 +1,24 @@
from backend.models.user import db
class TelegramBot(db.Model):
__tablename__ = 'telegram_bots'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False) # 봇 식별 이름 (예: 알림용, 경고용)
token = db.Column(db.String(200), nullable=False)
chat_id = db.Column(db.String(100), nullable=False)
is_active = db.Column(db.Boolean, default=True)
description = db.Column(db.String(255), nullable=True)
# 알림 유형: 콤마로 구분 (예: "auth,activity,system")
notification_types = db.Column(db.String(255), default="auth,activity,system")
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'token': self.token,
'chat_id': self.chat_id,
'is_active': self.is_active,
'description': self.description,
'notification_types': self.notification_types
}

View File

@@ -38,6 +38,10 @@ class User(db.Model, UserMixin):
password: Mapped[str] = mapped_column(String(255), nullable=False)
is_admin: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
# 가입 승인 관련 필드
is_approved: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
approval_token: Mapped[Optional[str]] = mapped_column(String(100), unique=True, nullable=True)
# ── 유틸 메서드
def __repr__(self) -> str: # pragma: no cover
@@ -109,14 +113,24 @@ class User(db.Model, UserMixin):
q = (email or "").strip().lower()
if not q:
return None
return User.query.filter_by(email=q).first()
try:
return User.query.filter_by(email=q).first()
except Exception as e:
logging.error(f"User find_by_email error: {e}")
db.session.rollback()
return None
@staticmethod
def find_by_username(username: Optional[str]) -> Optional["User"]:
q = (username or "").strip()
if not q:
return None
return User.query.filter_by(username=q).first()
try:
return User.query.filter_by(username=q).first()
except Exception as e:
logging.error(f"User find_by_username error: {e}")
db.session.rollback()
return None
# Flask-Login user_loader (SQLAlchemy 2.0 방식)

View File

@@ -5,8 +5,13 @@ from .auth import register_auth_routes
from .admin import register_admin_routes
from .main import register_main_routes
from .xml import register_xml_routes
from .utilities import register_util_routes
from .utilities import register_util_routes, utils_bp
from .file_view import register_file_view
from .jobs import register_jobs_routes
from .idrac_routes import register_idrac_routes
from .catalog_sync import catalog_bp
from .scp_routes import scp_bp
from .drm_sync import drm_sync_bp
def register_routes(app: Flask, socketio=None) -> None:
@@ -16,5 +21,10 @@ def register_routes(app: Flask, socketio=None) -> None:
register_admin_routes(app)
register_main_routes(app, socketio)
register_xml_routes(app)
register_util_routes(app)
register_file_view(app)
app.register_blueprint(utils_bp, url_prefix="/utils")
register_file_view(app)
register_jobs_routes(app)
register_idrac_routes(app)
app.register_blueprint(catalog_bp)
app.register_blueprint(scp_bp)
app.register_blueprint(drm_sync_bp)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -18,6 +18,11 @@ from flask import (
from flask_login import login_required, current_user
from backend.models.user import User, db
from backend.models.telegram_bot import TelegramBot
try:
from telegram import Bot
except ImportError:
Bot = None
admin_bp = Blueprint("admin", __name__)
@@ -124,3 +129,163 @@ def reset_password(user_id: int):
flash("비밀번호 변경 중 오류가 발생했습니다.", "danger")
return redirect(url_for("admin.admin_panel"))
# ▼▼▼ 시스템 설정 (텔레그램 봇 관리) ▼▼▼
@admin_bp.route("/admin/settings", methods=["GET"])
@login_required
@admin_required
def settings():
# 테이블 생성 확인 (임시)
try:
bots = TelegramBot.query.all()
except Exception:
db.create_all()
bots = []
return render_template("admin_settings.html", bots=bots)
@admin_bp.route("/admin/settings/bot/add", methods=["POST"])
@login_required
@admin_required
def add_bot():
name = request.form.get("name")
token = request.form.get("token")
chat_id = request.form.get("chat_id")
desc = request.form.get("description")
# 알림 유형 (체크박스 다중 선택)
notify_types = request.form.getlist("notify_types")
notify_str = ",".join(notify_types) if notify_types else ""
if not (name and token and chat_id):
flash("필수 항목(이름, 토큰, Chat ID)을 입력하세요.", "warning")
return redirect(url_for("admin.settings"))
bot = TelegramBot(
name=name,
token=token,
chat_id=chat_id,
description=desc,
is_active=True,
notification_types=notify_str
)
db.session.add(bot)
db.session.commit()
flash("텔레그램 봇이 추가되었습니다.", "success")
return redirect(url_for("admin.settings"))
@admin_bp.route("/admin/settings/bot/edit/<int:bot_id>", methods=["POST"])
@login_required
@admin_required
def edit_bot(bot_id):
bot = db.session.get(TelegramBot, bot_id)
if not bot:
abort(404)
bot.name = request.form.get("name")
bot.token = request.form.get("token")
bot.chat_id = request.form.get("chat_id")
bot.description = request.form.get("description")
bot.is_active = request.form.get("is_active") == "on"
# 알림 유형 업데이트
notify_types = request.form.getlist("notify_types")
bot.notification_types = ",".join(notify_types) if notify_types else ""
db.session.commit()
flash("봇 설정이 수정되었습니다.", "success")
return redirect(url_for("admin.settings"))
@admin_bp.route("/admin/settings/bot/delete/<int:bot_id>", methods=["POST"])
@login_required
@admin_required
def delete_bot(bot_id):
bot = db.session.get(TelegramBot, bot_id)
if bot:
db.session.delete(bot)
db.session.commit()
flash("봇이 삭제되었습니다.", "success")
return redirect(url_for("admin.settings"))
@admin_bp.route("/admin/settings/bot/test/<int:bot_id>", methods=["POST"])
@login_required
@admin_required
def test_bot(bot_id):
if not Bot:
flash("python-telegram-bot 라이브러리가 설치되지 않았습니다.", "danger")
return redirect(url_for("admin.settings"))
bot_obj = db.session.get(TelegramBot, bot_id)
if not bot_obj:
abort(404)
import asyncio
async def _send():
bot = Bot(token=bot_obj.token)
await bot.send_message(chat_id=bot_obj.chat_id, text="🔔 <b>테스트 메시지</b>\n이 메시지가 보이면 설정이 정상입니다.", parse_mode="HTML")
try:
asyncio.run(_send())
flash(f"'{bot_obj.name}' 봇으로 테스트 메시지를 보냈습니다.", "success")
except Exception as e:
flash(f"테스트 실패: {e}", "danger")
return redirect(url_for("admin.settings"))
# ▼▼▼ 시스템 로그 뷰어 ▼▼▼
@admin_bp.route("/admin/logs", methods=["GET"])
@login_required
@admin_required
def view_logs():
import os
import re
from collections import deque
log_folder = current_app.config.get('LOG_FOLDER')
log_file = os.path.join(log_folder, 'app.log') if log_folder else None
# 1. 실제 ANSI 이스케이프 코드 (\x1B로 시작)
ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]')
# 2. 텍스트로 찍힌 ANSI 코드 패턴 (예: [36m, [0m 등) - Werkzeug가 이스케이프 된 상태로 로그에 남길 경우 대비
literal_ansi = re.compile(r'\[[0-9;]+m')
# 3. 제어 문자 제거
control_char_re = re.compile(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]')
logs = []
if log_file and os.path.exists(log_file):
try:
with open(log_file, 'r', encoding='utf-8', errors='replace') as f:
raw_lines = deque(f, 1000)
for line in raw_lines:
# A. 실제 ANSI 코드 제거
clean_line = ansi_escape.sub('', line)
# B. 리터럴 ANSI 패턴 제거 (사용자가 [36m 등을 텍스트로 보고 있다면 이것이 원인)
clean_line = literal_ansi.sub('', clean_line)
# C. 제어 문자 제거
clean_line = control_char_re.sub('', clean_line)
# D. 앞뒤 공백 제거
clean_line = clean_line.strip()
# E. 빈 줄 제외
if clean_line:
logs.append(clean_line)
except Exception as e:
logs = [f"Error reading log file: {str(e)}"]
else:
logs = [f"Log file not found at: {log_file}"]
return render_template("admin_logs.html", logs=logs)

View File

@@ -3,8 +3,11 @@ from __future__ import annotations
import logging
import threading
import asyncio
import secrets
from typing import Optional
from urllib.parse import urlparse, urljoin
from datetime import datetime
from flask import (
Blueprint,
@@ -23,11 +26,13 @@ from backend.models.user import User, db
# ── (선택) Telegram: 미설정이면 조용히 패스
try:
from telegram import Bot
from telegram import Bot, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ParseMode
except Exception: # 라이브러리 미설치/미사용 환경
Bot = None
ParseMode = None
InlineKeyboardButton = None
InlineKeyboardMarkup = None
auth_bp = Blueprint("auth", __name__)
@@ -42,21 +47,162 @@ def _is_safe_url(target: str) -> bool:
return (test.scheme in ("http", "https")) and (ref.netloc == test.netloc)
def _notify(text: str) -> None:
"""텔레그램 알림 (설정 없으면 바로 return)."""
token = (current_app.config.get("TELEGRAM_BOT_TOKEN") or "").strip()
chat_id = (current_app.config.get("TELEGRAM_CHAT_ID") or "").strip()
if not (token and chat_id and Bot and ParseMode):
return
def _send():
def _notify(text: str, category: str = "system") -> None:
"""
텔레그램 알림 전송
- DB(TelegramBot)에 등록된 활성 봇들에게 전송
- category: 'auth', 'activity', 'system'
"""
try:
from backend.models.telegram_bot import TelegramBot
# 앱 컨텍스트 안에서 실행되므로 바로 DB 접근 가능
try:
bot = Bot(token=token)
bot.send_message(chat_id=chat_id, text=text, parse_mode=ParseMode.HTML)
except Exception as e:
current_app.logger.warning("Telegram send failed: %s", e)
bots = TelegramBot.query.filter_by(is_active=True).all()
except Exception:
db.create_all()
bots = []
threading.Thread(target=_send, daemon=True).start()
# 1. DB에 봇이 없고, Config에 설정이 있는 경우 -> 자동 마이그레이션
if not bots:
cfg_token = (current_app.config.get("TELEGRAM_BOT_TOKEN") or "").strip()
cfg_chat = (current_app.config.get("TELEGRAM_CHAT_ID") or "").strip()
if cfg_token and cfg_chat:
new_bot = TelegramBot(
name="기본 봇 (Config)",
token=cfg_token,
chat_id=cfg_chat,
is_active=True,
description="config.py 설정에서 자동 가져옴",
notification_types="auth,activity,system"
)
db.session.add(new_bot)
db.session.commit()
bots = [new_bot]
current_app.logger.info("Telegram: Config settings migrated to DB.")
if not bots:
return
# 카테고리 필터링
target_bots = []
for b in bots:
allowed = (b.notification_types or "auth,activity,system").split(",")
if category in allowed:
target_bots.append(b)
if not target_bots:
return
if not (Bot and ParseMode):
current_app.logger.warning("Telegram: Library not installed.")
return
app = current_app._get_current_object()
async def _send_to_bot(bot_obj, msg):
try:
t_bot = Bot(token=bot_obj.token)
await t_bot.send_message(chat_id=bot_obj.chat_id, text=msg, parse_mode=ParseMode.HTML)
except Exception as e:
app.logger.error(f"Telegram fail ({bot_obj.name}): {e}")
async def _send_all():
tasks = [_send_to_bot(b, text) for b in target_bots]
await asyncio.gather(*tasks)
def _run_thread():
try:
asyncio.run(_send_all())
except Exception as e:
app.logger.error(f"Telegram async loop error: {e}")
threading.Thread(target=_run_thread, daemon=True).start()
except Exception as e:
current_app.logger.error(f"Telegram notification error: {e}")
def _notify_with_buttons(text: str, buttons: list, category: str = "auth") -> None:
"""
텔레그램 알림 전송 (인라인 버튼 포함)
- buttons: [(text, callback_data), ...] 형식의 리스트
"""
try:
from backend.models.telegram_bot import TelegramBot
try:
bots = TelegramBot.query.filter_by(is_active=True).all()
except Exception:
db.create_all()
bots = []
if not bots:
cfg_token = (current_app.config.get("TELEGRAM_BOT_TOKEN") or "").strip()
cfg_chat = (current_app.config.get("TELEGRAM_CHAT_ID") or "").strip()
if cfg_token and cfg_chat:
new_bot = TelegramBot(
name="기본 봇 (Config)",
token=cfg_token,
chat_id=cfg_chat,
is_active=True,
description="config.py 설정에서 자동 가져옴",
notification_types="auth,activity,system"
)
db.session.add(new_bot)
db.session.commit()
bots = [new_bot]
if not bots:
return
target_bots = []
for b in bots:
allowed = (b.notification_types or "auth,activity,system").split(",")
if category in allowed:
target_bots.append(b)
if not target_bots:
return
if not (Bot and ParseMode and InlineKeyboardButton and InlineKeyboardMarkup):
current_app.logger.warning("Telegram: Library not installed.")
return
# 인라인 키보드 생성
keyboard = [[InlineKeyboardButton(btn[0], callback_data=btn[1]) for btn in buttons]]
reply_markup = InlineKeyboardMarkup(keyboard)
app = current_app._get_current_object()
async def _send_to_bot(bot_obj, msg, markup):
try:
t_bot = Bot(token=bot_obj.token)
await t_bot.send_message(
chat_id=bot_obj.chat_id,
text=msg,
parse_mode=ParseMode.HTML,
reply_markup=markup
)
except Exception as e:
app.logger.error(f"Telegram fail ({bot_obj.name}): {e}")
async def _send_all():
tasks = [_send_to_bot(b, text, reply_markup) for b in target_bots]
await asyncio.gather(*tasks)
def _run_thread():
try:
asyncio.run(_send_all())
except Exception as e:
app.logger.error(f"Telegram async loop error: {e}")
threading.Thread(target=_run_thread, daemon=True).start()
except Exception as e:
current_app.logger.error(f"Telegram notification with buttons error: {e}")
# ─────────────────────────────────────────────────────────────
@@ -67,12 +213,28 @@ def register_auth_routes(app):
app.register_blueprint(auth_bp)
@app.before_request
def _touch_session():
# 요청마다 세션 갱신(만료 슬라이딩) + 로그아웃 플래그 정리
def _global_hooks():
# 1. 세션 갱신 (요청마다 세션 타임아웃 연장)
session.modified = True
if current_user.is_authenticated and session.get("just_logged_out"):
session.pop("just_logged_out", None)
flash("세션이 만료되어 자동 로그아웃 되었습니다.", "info")
# 2. 활동 알림 (로그인된 사용자)
if current_user.is_authenticated:
# 정적 리소스 및 불필요한 경로 제외
if request.endpoint == 'static':
return
# 제외할 확장자/경로 (필요 시 추가)
ignored_exts = ('.css', '.js', '.png', '.jpg', '.jpeg', '.gif', '.ico', '.woff', '.woff2')
if request.path.lower().endswith(ignored_exts):
return
# 활동 내용 구성
msg = (
f"👣 <b>활동 감지</b>\n"
f"👤 <code>{current_user.username}</code>\n"
f"📍 <code>{request.method} {request.path}</code>"
)
_notify(msg, category="activity")
# ─────────────────────────────────────────────────────────────
@@ -97,21 +259,54 @@ def register():
current_app.logger.info("REGISTER: dup username %s", form.username.data)
return render_template("register.html", form=form)
user = User(username=form.username.data, email=form.email.data, is_active=False)
user.set_password(form.password.data) # passlib: 기본 Argon2id
db.session.add(user)
db.session.commit()
_notify(
f"🆕 <b>신규 가입 요청</b>\n"
f"📛 사용자: <code>{user.username}</code>\n"
f"📧 이메일: <code>{user.email}</code>"
# 승인 토큰 생성
approval_token = secrets.token_urlsafe(32)
user = User(
username=form.username.data,
email=form.email.data,
is_active=False,
is_approved=False,
approval_token=approval_token
)
current_app.logger.info("REGISTER: created id=%s email=%s", user.id, user.email)
user.set_password(form.password.data)
try:
db.session.add(user)
db.session.commit()
except Exception as e:
db.session.rollback()
current_app.logger.error("REGISTER: DB commit failed: %s", e)
flash("회원가입 처리 중 오류가 발생했습니다. (DB Error)", "danger")
return render_template("register.html", form=form)
# 텔레그램 알림 (인라인 버튼 포함)
message = (
f"🆕 <b>신규 가입 요청</b>\n\n"
f"<EFBFBD> 사용자: <code>{user.username}</code>\n"
f"📧 이메일: <code>{user.email}</code>\n"
f"🕐 신청시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
buttons = [
("✅ 승인", f"approve_{approval_token}"),
("❌ 거부", f"reject_{approval_token}")
]
_notify_with_buttons(message, buttons, category="auth")
current_app.logger.info("REGISTER: created id=%s email=%s token=%s", user.id, user.email, approval_token[:10])
flash("회원가입이 완료되었습니다. 관리자의 승인을 기다려주세요.", "success")
return redirect(url_for("auth.login"))
else:
if request.method == "POST":
# 폼 검증 실패 에러를 Flash 메시지로 출력
for field_name, errors in form.errors.items():
for error in errors:
# 필드 객체 가져오기 (라벨 텍스트 확인용)
field = getattr(form, field_name, None)
label = field.label.text if field else field_name
flash(f"{label}: {error}", "warning")
current_app.logger.info("REGISTER: form errors=%s", form.errors)
return render_template("register.html", form=form)
@@ -136,24 +331,28 @@ def login():
current_app.logger.info("LOGIN: user not found")
return render_template("login.html", form=form)
pass_ok = user.check_password(form.password.data) # passlib verify(+자동 재해시)
pass_ok = user.check_password(form.password.data)
current_app.logger.info(
"LOGIN: found id=%s active=%s pass_ok=%s",
user.id, user.is_active, pass_ok
"LOGIN: found id=%s active=%s approved=%s pass_ok=%s",
user.id, user.is_active, user.is_approved, pass_ok
)
if not pass_ok:
flash("이메일 또는 비밀번호가 올바르지 않습니다.", "danger")
return render_template("login.html", form=form)
if not user.is_approved:
flash("계정이 아직 승인되지 않았습니다. 관리자의 승인을 기다려주세요.", "warning")
return render_template("login.html", form=form)
if not user.is_active:
flash("계정이 아직 승인되지 않았습니다.", "warning")
flash("계정이 비활성화되었습니다. 관리자에게 문의하세요.", "warning")
return render_template("login.html", form=form)
# 성공
login_user(user, remember=form.remember.data)
session.permanent = True
_notify(f"🔐 <b>로그인 성공</b>\n👤 <code>{user.username}</code>")
_notify(f"🔐 <b>로그인 성공</b>\n👤 <code>{user.username}</code>", category="auth")
current_app.logger.info("LOGIN: SUCCESS → redirect")
nxt = request.args.get("next")
@@ -175,6 +374,7 @@ def login():
def logout():
if current_user.is_authenticated:
current_app.logger.info("LOGOUT: user=%s", current_user.username)
_notify(f"🚪 <b>로그아웃</b>\n👤 <code>{current_user.username}</code>", category="auth")
logout_user()
session["just_logged_out"] = True
flash("정상적으로 로그아웃 되었습니다.", "success")
return redirect(url_for("auth.login"))

View File

@@ -0,0 +1,23 @@
# backend/routes/catalog_sync.py
from flask import Blueprint, jsonify, request
from backend.services.dell_catalog_sync import sync_dell_catalog
catalog_bp = Blueprint("catalog", __name__, url_prefix="/catalog")
@catalog_bp.route("/sync", methods=["POST"])
def sync_catalog():
"""Dell Catalog 버전 정보를 동기화 (CSRF 보호 유지)"""
try:
data = request.get_json(silent=True) or {}
model = data.get("model", "PowerEdge R750")
sync_dell_catalog(model)
return jsonify({
"success": True,
"message": f"{model} 동기화 완료"
})
except Exception as e:
return jsonify({
"success": False,
"message": f"오류: {str(e)}"
})

View File

@@ -0,0 +1,61 @@
"""
DRM 카탈로그 동기화 라우트
backend/routes/drm_sync.py
"""
from flask import Blueprint, request, jsonify
from backend.services.drm_catalog_sync import sync_from_drm, check_drm_repository
drm_sync_bp = Blueprint('drm_sync', __name__, url_prefix='/drm')
@drm_sync_bp.route('/check', methods=['POST'])
def check_repository():
"""DRM 리포지토리 상태 확인"""
try:
data = request.get_json() or {}
repository_path = data.get('repository_path')
if not repository_path:
return jsonify({
'success': False,
'message': 'repository_path가 필요합니다'
}), 400
info = check_drm_repository(repository_path)
return jsonify({
'success': info.get('exists', False),
'info': info
})
except Exception as e:
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
}), 500
@drm_sync_bp.route('/sync', methods=['POST'])
def sync_repository():
"""DRM 리포지토리에서 펌웨어 동기화"""
try:
data = request.get_json() or {}
repository_path = data.get('repository_path')
model = data.get('model', 'PowerEdge R750')
if not repository_path:
return jsonify({
'success': False,
'message': 'repository_path가 필요합니다'
}), 400
result = sync_from_drm(repository_path, model)
return jsonify(result)
except Exception as e:
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
}), 500

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,669 @@
"""
Dell iDRAC 멀티 서버 펌웨어 관리 라우트
backend/routes/idrac_routes.py
- CSRF 보호 제외 추가
"""
from flask import Blueprint, render_template, request, jsonify, current_app
from werkzeug.utils import secure_filename
import os
from datetime import datetime
from backend.services.idrac_redfish_client import DellRedfishClient
from backend.models.idrac_server import IdracServer, db
from flask_socketio import emit
import threading
# Blueprint 생성
idrac_bp = Blueprint('idrac', __name__, url_prefix='/idrac')
# 설정
UPLOAD_FOLDER = 'uploads/firmware'
ALLOWED_EXTENSIONS = {'exe', 'bin'}
MAX_FILE_SIZE = 500 * 1024 * 1024 # 500MB
def allowed_file(filename):
"""허용된 파일 형식 확인"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# ========================================
# 메인 페이지
# ========================================
@idrac_bp.route('/')
def index():
"""iDRAC 멀티 서버 관리 메인 페이지"""
return render_template('idrac_firmware.html')
# ========================================
# 서버 관리 API
# ========================================
@idrac_bp.route('/api/servers', methods=['GET'])
def get_servers():
"""등록된 서버 목록 조회"""
try:
group = request.args.get('group') # 그룹 필터
query = IdracServer.query.filter_by(is_active=True)
if group and group != 'all':
query = query.filter_by(group_name=group)
servers = query.order_by(IdracServer.name).all()
return jsonify({
'success': True,
'servers': [s.to_dict() for s in servers]
})
except Exception as e:
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
@idrac_bp.route('/api/servers', methods=['POST'])
def add_server():
"""서버 추가"""
try:
data = request.json
# 필수 필드 확인
if not all([data.get('name'), data.get('ip_address'), data.get('password')]):
return jsonify({
'success': False,
'message': '필수 필드를 모두 입력하세요 (서버명, IP, 비밀번호)'
})
# 중복 IP 확인
existing = IdracServer.query.filter_by(ip_address=data['ip_address']).first()
if existing:
return jsonify({
'success': False,
'message': f'이미 등록된 IP입니다: {data["ip_address"]}'
})
# 서버 생성
server = IdracServer(
name=data['name'],
ip_address=data['ip_address'],
username=data.get('username', 'root'),
password=data['password'],
group_name=data.get('group_name'),
location=data.get('location'),
model=data.get('model'),
notes=data.get('notes')
)
db.session.add(server)
db.session.commit()
return jsonify({
'success': True,
'message': f'서버 {server.name} 추가 완료',
'server': server.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
@idrac_bp.route('/api/servers/<int:server_id>', methods=['PUT'])
def update_server(server_id):
"""서버 정보 수정"""
try:
server = IdracServer.query.get(server_id)
if not server:
return jsonify({
'success': False,
'message': '서버를 찾을 수 없습니다'
})
data = request.json
# 업데이트
if 'name' in data:
server.name = data['name']
if 'ip_address' in data:
server.ip_address = data['ip_address']
if 'username' in data:
server.username = data['username']
if 'password' in data:
server.password = data['password']
if 'group_name' in data:
server.group_name = data['group_name']
if 'location' in data:
server.location = data['location']
if 'model' in data:
server.model = data['model']
if 'notes' in data:
server.notes = data['notes']
db.session.commit()
return jsonify({
'success': True,
'message': '서버 정보 수정 완료',
'server': server.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
@idrac_bp.route('/api/servers/<int:server_id>', methods=['DELETE'])
def delete_server(server_id):
"""서버 삭제 (소프트 삭제)"""
try:
server = IdracServer.query.get(server_id)
if not server:
return jsonify({
'success': False,
'message': '서버를 찾을 수 없습니다'
})
server.is_active = False
db.session.commit()
return jsonify({
'success': True,
'message': f'서버 {server.name} 삭제 완료'
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
@idrac_bp.route('/api/groups', methods=['GET'])
def get_groups():
"""등록된 그룹 목록"""
try:
groups = db.session.query(IdracServer.group_name)\
.filter(IdracServer.is_active == True)\
.filter(IdracServer.group_name.isnot(None))\
.distinct()\
.all()
group_list = [g[0] for g in groups if g[0]]
return jsonify({
'success': True,
'groups': group_list
})
except Exception as e:
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
# ========================================
# 연결 및 상태 확인 API
# ========================================
@idrac_bp.route('/api/servers/<int:server_id>/test', methods=['POST'])
def test_connection(server_id):
"""단일 서버 연결 테스트"""
try:
server = IdracServer.query.get(server_id)
if not server:
return jsonify({
'success': False,
'message': '서버를 찾을 수 없습니다'
})
client = DellRedfishClient(server.ip_address, server.username, server.password)
if client.check_connection():
server.status = 'online'
server.last_connected = datetime.utcnow()
db.session.commit()
return jsonify({
'success': True,
'message': f'{server.name} 연결 성공'
})
else:
server.status = 'offline'
db.session.commit()
return jsonify({
'success': False,
'message': f'{server.name} 연결 실패'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
@idrac_bp.route('/api/servers/test-multi', methods=['POST'])
def test_connections_multi():
"""다중 서버 일괄 연결 테스트"""
try:
data = request.json
server_ids = data.get('server_ids', [])
if not server_ids:
return jsonify({
'success': False,
'message': '서버를 선택하세요'
})
results = []
for server_id in server_ids:
server = IdracServer.query.get(server_id)
if not server:
continue
try:
client = DellRedfishClient(server.ip_address, server.username, server.password)
if client.check_connection():
server.status = 'online'
server.last_connected = datetime.utcnow()
results.append({
'server_id': server.id,
'server_name': server.name,
'success': True,
'message': '연결 성공'
})
else:
server.status = 'offline'
results.append({
'server_id': server.id,
'server_name': server.name,
'success': False,
'message': '연결 실패'
})
except Exception as e:
server.status = 'offline'
results.append({
'server_id': server.id,
'server_name': server.name,
'success': False,
'message': str(e)
})
db.session.commit()
success_count = sum(1 for r in results if r['success'])
return jsonify({
'success': True,
'results': results,
'summary': {
'total': len(results),
'success': success_count,
'failed': len(results) - success_count
}
})
except Exception as e:
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
@idrac_bp.route('/api/servers/<int:server_id>/firmware', methods=['GET'])
def get_server_firmware(server_id):
"""단일 서버 펌웨어 버전 조회"""
try:
server = IdracServer.query.get(server_id)
if not server:
return jsonify({
'success': False,
'message': '서버를 찾을 수 없습니다'
})
client = DellRedfishClient(server.ip_address, server.username, server.password)
inventory = client.get_firmware_inventory()
# BIOS 버전 업데이트
for item in inventory:
if 'BIOS' in item.get('Name', ''):
server.current_bios = item.get('Version')
break
db.session.commit()
return jsonify({
'success': True,
'data': inventory
})
except Exception as e:
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
# ========================================
# 멀티 서버 펌웨어 업로드 API
# ========================================
@idrac_bp.route('/api/upload-multi', methods=['POST'])
def upload_multi():
"""다중 서버에 펌웨어 일괄 업로드"""
try:
if 'file' not in request.files:
return jsonify({
'success': False,
'message': '파일이 없습니다'
})
file = request.files['file']
server_ids_str = request.form.get('server_ids')
if not server_ids_str:
return jsonify({
'success': False,
'message': '서버를 선택하세요'
})
server_ids = [int(x) for x in server_ids_str.split(',')]
if file.filename == '':
return jsonify({
'success': False,
'message': '파일이 선택되지 않았습니다'
})
if not allowed_file(file.filename):
return jsonify({
'success': False,
'message': '허용되지 않은 파일 형식입니다 (.exe, .bin만 가능)'
})
# 로컬에 임시 저장
filename = secure_filename(file.filename)
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
local_path = os.path.join(UPLOAD_FOLDER, filename)
file.save(local_path)
# 백그라운드 스레드로 업로드 시작
from backend.services import watchdog_handler
socketio = watchdog_handler.socketio
def upload_to_servers():
results = []
for idx, server_id in enumerate(server_ids):
server = IdracServer.query.get(server_id)
if not server:
continue
try:
# 진행 상황 전송
if socketio:
socketio.emit('upload_progress', {
'server_id': server.id,
'server_name': server.name,
'status': 'uploading',
'progress': 0,
'message': '업로드 시작...'
})
server.status = 'updating'
db.session.commit()
client = DellRedfishClient(server.ip_address, server.username, server.password)
result = client.upload_firmware_staged(local_path)
if result['success']:
server.status = 'online'
server.last_updated = datetime.utcnow()
results.append({
'server_id': server.id,
'server_name': server.name,
'success': True,
'job_id': result['job_id'],
'message': '업로드 완료'
})
if socketio:
socketio.emit('upload_progress', {
'server_id': server.id,
'server_name': server.name,
'status': 'completed',
'progress': 100,
'message': '업로드 완료',
'job_id': result['job_id']
})
else:
server.status = 'online'
results.append({
'server_id': server.id,
'server_name': server.name,
'success': False,
'message': result.get('message', '업로드 실패')
})
if socketio:
socketio.emit('upload_progress', {
'server_id': server.id,
'server_name': server.name,
'status': 'failed',
'progress': 0,
'message': result.get('message', '업로드 실패')
})
db.session.commit()
except Exception as e:
server.status = 'online'
db.session.commit()
results.append({
'server_id': server.id,
'server_name': server.name,
'success': False,
'message': str(e)
})
if socketio:
socketio.emit('upload_progress', {
'server_id': server.id,
'server_name': server.name,
'status': 'failed',
'progress': 0,
'message': str(e)
})
# 최종 결과 전송
if socketio:
success_count = sum(1 for r in results if r['success'])
socketio.emit('upload_complete', {
'results': results,
'summary': {
'total': len(results),
'success': success_count,
'failed': len(results) - success_count
}
})
# 스레드 시작
thread = threading.Thread(target=upload_to_servers)
thread.daemon = True
thread.start()
return jsonify({
'success': True,
'message': f'{len(server_ids)}대 서버에 업로드 시작',
'filename': filename
})
except Exception as e:
return jsonify({
'success': False,
'message': f'업로드 오류: {str(e)}'
})
# ========================================
# 기존 단일 서버 API (호환성 유지)
# ========================================
@idrac_bp.route('/api/files/local', methods=['GET'])
def list_local_files():
"""로컬에 저장된 DUP 파일 목록"""
try:
files = []
if os.path.exists(UPLOAD_FOLDER):
for filename in os.listdir(UPLOAD_FOLDER):
filepath = os.path.join(UPLOAD_FOLDER, filename)
if os.path.isfile(filepath):
file_size = os.path.getsize(filepath)
files.append({
'name': filename,
'size': file_size,
'size_mb': round(file_size / (1024 * 1024), 2),
'uploaded_at': datetime.fromtimestamp(
os.path.getmtime(filepath)
).strftime('%Y-%m-%d %H:%M:%S')
})
return jsonify({
'success': True,
'files': files
})
except Exception as e:
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
@idrac_bp.route('/api/files/local/<filename>', methods=['DELETE'])
def delete_local_file(filename):
"""로컬 DUP 파일 삭제"""
try:
filepath = os.path.join(UPLOAD_FOLDER, secure_filename(filename))
if os.path.exists(filepath):
os.remove(filepath)
return jsonify({
'success': True,
'message': f'{filename} 삭제 완료'
})
else:
return jsonify({
'success': False,
'message': '파일을 찾을 수 없습니다'
})
except Exception as e:
return jsonify({
'success': False,
'message': f'삭제 오류: {str(e)}'
})
# ========================================
# 서버 재부팅 API (멀티)
# ========================================
@idrac_bp.route('/api/servers/reboot-multi', methods=['POST'])
def reboot_servers_multi():
"""선택한 서버들 일괄 재부팅"""
try:
data = request.json
server_ids = data.get('server_ids', [])
reboot_type = data.get('type', 'GracefulRestart')
if not server_ids:
return jsonify({
'success': False,
'message': '서버를 선택하세요'
})
results = []
for server_id in server_ids:
server = IdracServer.query.get(server_id)
if not server:
continue
try:
client = DellRedfishClient(server.ip_address, server.username, server.password)
result = client.reboot_server(reboot_type)
if result:
results.append({
'server_id': server.id,
'server_name': server.name,
'success': True,
'message': '재부팅 시작'
})
else:
results.append({
'server_id': server.id,
'server_name': server.name,
'success': False,
'message': '재부팅 실패'
})
except Exception as e:
results.append({
'server_id': server.id,
'server_name': server.name,
'success': False,
'message': str(e)
})
success_count = sum(1 for r in results if r['success'])
return jsonify({
'success': True,
'results': results,
'summary': {
'total': len(results),
'success': success_count,
'failed': len(results) - success_count
}
})
except Exception as e:
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
# Blueprint 등록 함수
def register_idrac_routes(app):
"""
iDRAC 멀티 서버 관리 Blueprint 등록
Args:
app: Flask 애플리케이션 인스턴스
"""
# uploads 디렉토리 생성
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# Blueprint 등록
app.register_blueprint(idrac_bp)
# CSRF 보호 제외 - iDRAC API 엔드포인트
try:
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect()
# API 엔드포인트는 CSRF 검증 제외
csrf.exempt(idrac_bp)
except:
pass # CSRF 설정 실패해도 계속 진행
# DB 테이블 생성
with app.app_context():
db.create_all()

279
backend/routes/jobs.py Normal file
View File

@@ -0,0 +1,279 @@
"""
Flask Blueprint for iDRAC Job Monitoring (Redfish 버전)
기존 routes/jobs.py 또는 backend/routes/jobs.py를 이 파일로 교체하세요.
"""
import time
import logging
from flask import Blueprint, render_template, jsonify, request
from flask_login import login_required
from backend.services.idrac_jobs import (
scan_all,
parse_ip_list,
load_ip_list,
LRUJobCache,
is_active_status,
is_done_status,
parse_iso_datetime,
iso_now
)
import os
logger = logging.getLogger(__name__)
# Blueprint 생성
jobs_bp = Blueprint("jobs", __name__, url_prefix="/jobs")
# Job 캐시 (전역)
MAX_CACHE_SIZE = int(os.getenv("MAX_CACHE_SIZE", "10000"))
CACHE_GC_INTERVAL = int(os.getenv("CACHE_GC_INTERVAL", "3600"))
JOB_GRACE_MINUTES = int(os.getenv("JOB_GRACE_MINUTES", "60"))
JOB_RECENCY_HOURS = int(os.getenv("JOB_RECENCY_HOURS", "24"))
JOB_CACHE = LRUJobCache(max_size=MAX_CACHE_SIZE)
# ────────────────────────────────────────────────────────────
# Routes
# ────────────────────────────────────────────────────────────
@jobs_bp.route("", methods=["GET"])
@login_required
def jobs_page():
"""메인 페이지"""
return render_template("jobs.html")
@jobs_bp.route("/config", methods=["GET"])
@login_required
def jobs_config():
"""프론트엔드 설정 제공"""
return jsonify({
"ok": True,
"config": {
"grace_minutes": JOB_GRACE_MINUTES,
"recency_hours": JOB_RECENCY_HOURS,
"poll_interval_ms": int(os.getenv("POLL_INTERVAL_MS", "10000")),
}
})
@jobs_bp.route("/iplist", methods=["GET"])
@login_required
def get_ip_list():
"""IP 목록 조회 (파일에서)"""
try:
ips = load_ip_list()
return jsonify({
"ok": True,
"ips": ips,
"count": len(ips)
})
except Exception as e:
logger.exception("Failed to load IP list")
return jsonify({
"ok": False,
"error": str(e)
}), 500
@jobs_bp.route("/scan", methods=["POST"])
@login_required
def scan_jobs():
"""
Job 스캔 및 모니터링
Request Body:
{
"ips": List[str] (optional),
"method": "redfish" (기본값),
"recency_hours": int (기본: 24),
"grace_minutes": int (기본: 60),
"include_tracked_done": bool (기본: True)
}
Response:
{
"ok": True,
"count": int,
"items": [
{
"ip": str,
"ok": bool,
"error": str (if not ok),
"jobs": List[Dict]
}
]
}
"""
data = request.get_json(silent=True) or {}
# IP 목록
ip_input = data.get("ips")
if ip_input:
ips = parse_ip_list("\n".join(ip_input) if isinstance(ip_input, list) else str(ip_input))
else:
ips = load_ip_list()
if not ips:
return jsonify({
"ok": False,
"error": "No IPs provided",
"items": []
}), 400
# 파라미터
method = data.get("method", "redfish") # redfish가 기본값
recency_hours = int(data.get("recency_hours", JOB_RECENCY_HOURS))
grace_minutes = int(data.get("grace_minutes", JOB_GRACE_MINUTES))
include_tracked_done = bool(data.get("include_tracked_done", True))
grace_sec = grace_minutes * 60
cutoff = time.time() - recency_hours * 3600
# 현재 IP 목록과 다른 캐시 항목 제거
JOB_CACHE.clear_for_ips(set(ips))
# 스캔 실행
try:
items = scan_all(ips, method=method)
except Exception as e:
logger.exception("Scan failed")
return jsonify({
"ok": False,
"error": str(e),
"items": []
}), 500
now = time.time()
# 캐시 업데이트
for item in items:
ip = item.get("ip", "")
if not item.get("ok") or not isinstance(item.get("jobs"), list):
continue
for job in item["jobs"]:
status = job.get("Status")
message = job.get("Message")
active_now = is_active_status(status, message)
done_now = is_done_status(status)
# 시작 시간 파싱
start_ts = parse_iso_datetime(job.get("StartTime"))
# 리센시 판정
if not active_now:
if start_ts is None or start_ts < cutoff:
continue
# 캐시 키 생성
key = _make_cache_key(ip, job)
entry = JOB_CACHE.get(key)
if entry is None:
JOB_CACHE.set(key, {
"record": dict(job),
"first_seen_active": (now if active_now else None),
"became_done_at": (now if done_now else None),
"first_seen": now,
"last_seen": now,
"start_ts": start_ts,
})
else:
entry["record"] = dict(job)
entry["last_seen"] = now
if active_now and not entry.get("first_seen_active"):
entry["first_seen_active"] = now
if done_now and not entry.get("became_done_at"):
entry["became_done_at"] = now
elif not done_now:
entry["became_done_at"] = None
if start_ts:
entry["start_ts"] = start_ts
JOB_CACHE.set(key, entry)
# 응답 생성
out_items = []
for item in items:
ip = item.get("ip", "")
shown_jobs = []
# 현재 Active Job
current_active = []
if item.get("ok") and isinstance(item.get("jobs"), list):
for job in item["jobs"]:
if is_active_status(job.get("Status"), job.get("Message")):
key = _make_cache_key(ip, job)
if key in JOB_CACHE.keys():
current_active.append(JOB_CACHE.get(key)["record"])
if current_active:
shown_jobs = current_active
else:
# Active가 없을 때: 추적된 최근 완료 Job 표시
if include_tracked_done:
for key in JOB_CACHE.keys():
if key[0] != ip:
continue
entry = JOB_CACHE.get(key)
if not entry:
continue
start_ok = (entry.get("start_ts") or 0) >= cutoff
done_at = entry.get("became_done_at")
done_ok = bool(done_at and now - done_at <= grace_sec)
still_active = entry.get("became_done_at") is None
if still_active and start_ok:
shown_jobs.append(entry["record"])
elif done_ok and start_ok:
rec = dict(entry["record"])
rec["RecentlyCompleted"] = True
rec["CompletedAt"] = iso_now()
shown_jobs.append(rec)
out_items.append({
"ip": ip,
"ok": item.get("ok"),
"error": item.get("error"),
"jobs": sorted(shown_jobs, key=lambda r: r.get("JID", ""))
})
# 캐시 GC (조건부)
if now - JOB_CACHE.last_gc >= CACHE_GC_INTERVAL:
JOB_CACHE.gc(max_age_seconds=24 * 3600)
return jsonify({
"ok": True,
"count": len(out_items),
"items": out_items
})
def _make_cache_key(ip: str, job: dict):
"""캐시 키 생성"""
jid = (job.get("JID") or "").strip()
if jid:
return (ip, jid)
name = (job.get("Name") or "").strip()
return (ip, f"NOJID::{name}")
# ────────────────────────────────────────────────────────────
# 기존 패턴에 맞는 register 함수 추가
# ────────────────────────────────────────────────────────────
def register_jobs_routes(app):
"""
iDRAC Job 모니터링 라우트 등록
기존 프로젝트 패턴에 맞춘 함수
"""
from flask import Flask
app.register_blueprint(jobs_bp)
logger.info("Jobs routes registered at /jobs")

View File

@@ -47,11 +47,43 @@ def index():
info_dir = Path(Config.IDRAC_INFO_FOLDER)
backup_dir = Path(Config.BACKUP_FOLDER)
scripts = [f.name for f in script_dir.glob("*") if f.is_file() and f.name != ".env"]
scripts = natsorted(scripts)
# 1. 스크립트 목록 조회 및 카테고리 분류
all_scripts = [f.name for f in script_dir.glob("*") if f.is_file() and f.name != ".env"]
all_scripts = natsorted(all_scripts)
grouped_scripts = {}
for script in all_scripts:
upper = script.upper()
category = "General"
if upper.startswith("GPU"):
category = "GPU"
elif upper.startswith("LOM"):
category = "LOM"
elif upper.startswith("TYPE") or upper.startswith("XE"):
category = "Server Models"
elif "MAC" in upper:
category = "MAC Info"
elif "GUID" in upper:
category = "GUID Info"
elif "SET_" in upper or "CONFIG" in upper:
category = "Configuration"
if category not in grouped_scripts:
grouped_scripts[category] = []
grouped_scripts[category].append(script)
# 카테고리 정렬 (General은 마지막에)
sorted_categories = sorted(grouped_scripts.keys())
if "General" in sorted_categories:
sorted_categories.remove("General")
sorted_categories.append("General")
grouped_scripts_sorted = {k: grouped_scripts[k] for k in sorted_categories}
# 2. XML 파일 목록
xml_files = [f.name for f in xml_dir.glob("*.xml")]
# 페이지네이션
# 3. 페이지네이션 및 파일 목록
page = int(request.args.get("page", 1))
info_files = [f.name for f in info_dir.glob("*") if f.is_file()]
info_files = natsorted(info_files)
@@ -59,9 +91,13 @@ def index():
start = (page - 1) * Config.FILES_PER_PAGE
end = start + Config.FILES_PER_PAGE
files_to_display = [{"name": Path(f).stem, "file": f} for f in info_files[start:end]]
total_pages = (len(info_files) + Config.FILES_PER_PAGE - 1) // Config.FILES_PER_PAGE
# 백업 폴더 목록 (디렉터리만)
start_page = ((page - 1) // 10) * 10 + 1
end_page = min(start_page + 9, total_pages)
# 4. 백업 폴더 목록
backup_dirs = [d for d in backup_dir.iterdir() if d.is_dir()]
backup_dirs.sort(key=lambda p: p.stat().st_mtime, reverse=True)
@@ -81,10 +117,13 @@ def index():
files_to_display=files_to_display,
page=page,
total_pages=total_pages,
start_page=start_page,
end_page=end_page,
backup_files=backup_files,
total_backup_pages=total_backup_pages,
backup_page=backup_page,
scripts=scripts,
scripts=all_scripts, # 기존 리스트 호환
grouped_scripts=grouped_scripts_sorted, # 카테고리별 분류
xml_files=xml_files,
)
@@ -168,13 +207,13 @@ def delete_file(filename: str):
if file_path.exists():
try:
file_path.unlink()
flash(f"{filename} 삭제됨.")
flash(f"'{filename}' 파일이 삭제되었습니다.", "success")
logging.info(f"파일 삭제됨: {filename}")
except Exception as e:
logging.error(f"파일 삭제 오류: {e}")
flash("파일 삭제 중 오류가 발생했습니다.", "danger")
else:
flash("파일이 존재하지 않습니다.")
flash("파일이 존재하지 않습니다.", "warning")
return redirect(url_for("main.index"))

View File

@@ -0,0 +1,166 @@
from flask import Blueprint, render_template, request, jsonify, flash, redirect, url_for
from flask_login import login_required, current_user
import logging
import difflib
from pathlib import Path
from config import Config
from backend.services.redfish_client import RedfishClient
from backend.routes.xml import sanitize_preserve_unicode
scp_bp = Blueprint("scp", __name__)
logger = logging.getLogger(__name__)
@scp_bp.route("/scp/diff", methods=["GET"])
@login_required
def diff_scp():
"""
두 XML 파일의 차이점을 비교하여 보여줍니다.
"""
file1_name = request.args.get("file1")
file2_name = request.args.get("file2")
if not file1_name or not file2_name:
flash("비교할 두 파일을 선택해주세요.", "warning")
return redirect(url_for("xml.xml_management"))
try:
file1_path = Path(Config.XML_FOLDER) / sanitize_preserve_unicode(file1_name)
file2_path = Path(Config.XML_FOLDER) / sanitize_preserve_unicode(file2_name)
if not file1_path.exists() or not file2_path.exists():
flash("파일을 찾을 수 없습니다.", "danger")
return redirect(url_for("xml.xml_management"))
# 파일 내용 읽기 (LF로 통일)
# 파일 내용 읽기 (LF로 통일)
# Monaco Editor에 원본 텍스트를 그대로 전달하기 위해 splitlines() 제거
# 파일 내용 읽기 (LF로 통일)
logger.info(f"Reading file1: {file1_path}")
content1 = file1_path.read_text(encoding="utf-8", errors="replace").replace("\r\n", "\n")
logger.info(f"Reading file2: {file2_path}")
content2 = file2_path.read_text(encoding="utf-8", errors="replace").replace("\r\n", "\n")
logger.info(f"Content1 length: {len(content1)}, Content2 length: {len(content2)}")
return render_template("scp_diff.html",
file1=file1_name,
file2=file2_name,
content1=content1,
content2=content2)
except Exception as e:
logger.error(f"Diff error: {e}")
flash(f"비교 중 오류가 발생했습니다: {str(e)}", "danger")
return redirect(url_for("xml.xml_management"))
@scp_bp.route("/scp/content/<path:filename>")
@login_required
def get_scp_content(filename):
"""
XML 파일 내용을 반환하는 API (Monaco Editor용)
"""
try:
safe_name = sanitize_preserve_unicode(filename)
path = Path(Config.XML_FOLDER) / safe_name
if not path.exists():
return "File not found", 404
# 텍스트로 읽어서 반환
content = path.read_text(encoding="utf-8", errors="replace").replace("\r\n", "\n")
return content, 200, {'Content-Type': 'text/plain; charset=utf-8'}
except Exception as e:
logger.error(f"Content read error: {e}")
return str(e), 500
@scp_bp.route("/scp/export", methods=["POST"])
@login_required
def export_scp():
"""
iDRAC에서 설정을 내보내기 (Export)
네트워크 공유 설정이 필요합니다.
"""
data = request.form
target_ip = data.get("target_ip")
username = data.get("username")
password = data.get("password")
# Share Parameters
share_ip = data.get("share_ip")
share_name = data.get("share_name")
share_user = data.get("share_user")
share_pwd = data.get("share_pwd")
filename = data.get("filename")
if not all([target_ip, username, password, share_ip, share_name, filename]):
flash("필수 정보가 누락되었습니다.", "warning")
return redirect(url_for("xml.xml_management"))
share_params = {
"IPAddress": share_ip,
"ShareName": share_name,
"FileName": filename,
"ShareType": "CIFS", # 기본값 CIFS
"UserName": share_user,
"Password": share_pwd
}
try:
with RedfishClient(target_ip, username, password) as client:
job_id = client.export_system_configuration(share_params)
if job_id:
flash(f"내보내기 작업이 시작되었습니다. Job ID: {job_id}", "success")
else:
flash("작업을 시작했으나 Job ID를 받지 못했습니다.", "warning")
except Exception as e:
logger.error(f"Export failed: {e}")
flash(f"내보내기 실패: {str(e)}", "danger")
return redirect(url_for("xml.xml_management"))
@scp_bp.route("/scp/import", methods=["POST"])
@login_required
def import_scp():
"""
iDRAC로 설정 가져오기 (Import/Deploy)
"""
data = request.form
target_ip = data.get("target_ip")
username = data.get("username")
password = data.get("password")
# Share Parameters
share_ip = data.get("share_ip")
share_name = data.get("share_name")
share_user = data.get("share_user")
share_pwd = data.get("share_pwd")
filename = data.get("filename")
import_mode = data.get("import_mode", "Replace")
if not all([target_ip, username, password, share_ip, share_name, filename]):
flash("필수 정보가 누락되었습니다.", "warning")
return redirect(url_for("xml.xml_management"))
share_params = {
"IPAddress": share_ip,
"ShareName": share_name,
"FileName": filename,
"ShareType": "CIFS",
"UserName": share_user,
"Password": share_pwd
}
try:
with RedfishClient(target_ip, username, password) as client:
job_id = client.import_system_configuration(share_params, import_mode=import_mode)
if job_id:
flash(f"설정 적용(Import) 작업이 시작되었습니다. Job ID: {job_id}", "success")
else:
flash("작업을 시작했으나 Job ID를 받지 못했습니다.", "warning")
except Exception as e:
logger.error(f"Import failed: {e}")
flash(f"설정 적용 실패: {str(e)}", "danger")
return redirect(url_for("xml.xml_management"))

View File

@@ -290,13 +290,83 @@ def update_gpu_list():
return redirect(url_for("main.index"))
@utils_bp.route("/download_excel")
@login_required
def download_excel():
path = Path(Config.SERVER_LIST_FOLDER) / "mac_info.xlsx"
if not path.is_file():
flash("엑셀 파일을 찾을 수 없습니다.", "danger")
return redirect(url_for("main.index"))
logging.info(f"엑셀 파일 다운로드: {path}")
return send_file(str(path), as_attachment=True, download_name="mac_info.xlsx")
@utils_bp.route("/scan_network", methods=["POST"])
@login_required
def scan_network():
"""
지정된 IP 범위(Start ~ End)에 대해 Ping 테스트를 수행하고
응답이 있는 IP 목록을 반환합니다.
"""
try:
import ipaddress
import platform
import concurrent.futures
data = request.get_json(force=True, silent=True) or {}
start_ip_str = data.get('start_ip')
end_ip_str = data.get('end_ip')
if not start_ip_str or not end_ip_str:
return jsonify({"success": False, "error": "시작 IP와 종료 IP를 모두 입력해주세요."}), 400
try:
start_ip = ipaddress.IPv4Address(start_ip_str)
end_ip = ipaddress.IPv4Address(end_ip_str)
if start_ip > end_ip:
return jsonify({"success": False, "error": "시작 IP가 종료 IP보다 큽니다."}), 400
# IP 개수 제한 (너무 많은 스캔 방지, 예: C클래스 2개 분량 512개)
if int(end_ip) - int(start_ip) > 512:
return jsonify({"success": False, "error": "스캔 범위가 너무 넓습니다. (최대 512개)"}), 400
except ValueError:
return jsonify({"success": False, "error": "유효하지 않은 IP 주소 형식입니다."}), 400
# Ping 함수 정의
def ping_ip(ip_obj):
ip = str(ip_obj)
param = '-n' if platform.system().lower() == 'windows' else '-c'
timeout_param = '-w' if platform.system().lower() == 'windows' else '-W'
# Windows: -w 200 (ms), Linux: -W 1 (s)
timeout_val = '200' if platform.system().lower() == 'windows' else '1'
command = ['ping', param, '1', timeout_param, timeout_val, ip]
try:
# shell=False로 보안 강화, stdout/stderr 무시
res = subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return ip if res.returncode == 0 else None
except Exception:
return None
active_ips = []
# IP 리스트 생성
target_ips = []
temp_ip = start_ip
while temp_ip <= end_ip:
target_ips.append(temp_ip)
temp_ip += 1
# 병렬 처리 (최대 50 쓰레드)
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
results = executor.map(ping_ip, target_ips)
# 결과 수집 (None 제외)
active_ips = [ip for ip in results if ip is not None]
return jsonify({
"success": True,
"active_ips": active_ips,
"count": len(active_ips),
"message": f"스캔 완료: {len(active_ips)}개의 활성 IP 발견"
})
except Exception as e:
logging.error(f"Scan network fatal error: {e}")
return jsonify({"success": False, "error": f"서버 내부 오류: {str(e)}"}), 500

View File

@@ -0,0 +1,399 @@
"""
펌웨어 버전 비교 API 코드
idrac_routes.py 파일의 register_idrac_routes 함수 위에 추가하세요
"""
# ========================================
# 펌웨어 버전 관리 API
# ========================================
@idrac_bp.route('/api/firmware-versions', methods=['GET'])
def get_firmware_versions():
"""등록된 최신 펌웨어 버전 목록"""
try:
server_model = request.args.get('model') # 서버 모델 필터
query = FirmwareVersion.query.filter_by(is_active=True)
if server_model:
# 특정 모델 또는 범용
query = query.filter(
(FirmwareVersion.server_model == server_model) |
(FirmwareVersion.server_model == None)
)
versions = query.order_by(FirmwareVersion.component_name).all()
return jsonify({
'success': True,
'versions': [v.to_dict() for v in versions]
})
except Exception as e:
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
@idrac_bp.route('/api/firmware-versions', methods=['POST'])
def add_firmware_version():
"""최신 펌웨어 버전 등록"""
try:
data = request.json
# 필수 필드 확인
if not all([data.get('component_name'), data.get('latest_version')]):
return jsonify({
'success': False,
'message': '컴포넌트명과 버전을 입력하세요'
})
# 중복 확인 (같은 컴포넌트, 같은 모델)
existing = FirmwareVersion.query.filter_by(
component_name=data['component_name'],
server_model=data.get('server_model')
).first()
if existing:
return jsonify({
'success': False,
'message': f'이미 등록된 컴포넌트입니다'
})
# 버전 생성
version = FirmwareVersion(
component_name=data['component_name'],
component_type=data.get('component_type'),
vendor=data.get('vendor'),
server_model=data.get('server_model'),
latest_version=data['latest_version'],
release_date=data.get('release_date'),
download_url=data.get('download_url'),
file_name=data.get('file_name'),
file_size_mb=data.get('file_size_mb'),
notes=data.get('notes'),
is_critical=data.get('is_critical', False)
)
db.session.add(version)
db.session.commit()
return jsonify({
'success': True,
'message': f'{version.component_name} 버전 정보 등록 완료',
'version': version.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
@idrac_bp.route('/api/firmware-versions/<int:version_id>', methods=['PUT'])
def update_firmware_version(version_id):
"""펌웨어 버전 정보 수정"""
try:
version = FirmwareVersion.query.get(version_id)
if not version:
return jsonify({
'success': False,
'message': '버전 정보를 찾을 수 없습니다'
})
data = request.json
# 업데이트
if 'component_name' in data:
version.component_name = data['component_name']
if 'latest_version' in data:
version.latest_version = data['latest_version']
if 'release_date' in data:
version.release_date = data['release_date']
if 'download_url' in data:
version.download_url = data['download_url']
if 'file_name' in data:
version.file_name = data['file_name']
if 'notes' in data:
version.notes = data['notes']
if 'is_critical' in data:
version.is_critical = data['is_critical']
db.session.commit()
return jsonify({
'success': True,
'message': '버전 정보 수정 완료',
'version': version.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
@idrac_bp.route('/api/firmware-versions/<int:version_id>', methods=['DELETE'])
def delete_firmware_version(version_id):
"""펌웨어 버전 정보 삭제"""
try:
version = FirmwareVersion.query.get(version_id)
if not version:
return jsonify({
'success': False,
'message': '버전 정보를 찾을 수 없습니다'
})
version.is_active = False
db.session.commit()
return jsonify({
'success': True,
'message': f'{version.component_name} 버전 정보 삭제 완료'
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
@idrac_bp.route('/api/servers/<int:server_id>/firmware/compare', methods=['GET'])
def compare_server_firmware(server_id):
"""
서버 펌웨어 버전 비교
현재 버전과 최신 버전을 비교하여 업데이트 필요 여부 확인
"""
try:
server = IdracServer.query.get(server_id)
if not server:
return jsonify({
'success': False,
'message': '서버를 찾을 수 없습니다'
})
# 현재 펌웨어 조회
client = DellRedfishClient(server.ip_address, server.username, server.password)
current_inventory = client.get_firmware_inventory()
# 최신 버전 정보 조회
latest_versions = FirmwareVersion.query.filter_by(is_active=True).all()
# 비교 결과
comparisons = []
for current_fw in current_inventory:
component_name = current_fw['Name']
current_version = current_fw['Version']
# 최신 버전 찾기 (컴포넌트명 매칭)
latest = None
for lv in latest_versions:
if lv.component_name.lower() in component_name.lower():
# 서버 모델 확인
if not lv.server_model or lv.server_model == server.model:
latest = lv
break
# 비교
comparison = FirmwareComparisonResult(
component_name=component_name,
current_version=current_version,
latest_version=latest.latest_version if latest else None
)
result = comparison.to_dict()
# 추가 정보
if latest:
result['latest_info'] = {
'release_date': latest.release_date,
'download_url': latest.download_url,
'file_name': latest.file_name,
'is_critical': latest.is_critical,
'notes': latest.notes
}
comparisons.append(result)
# 통계
total = len(comparisons)
outdated = len([c for c in comparisons if c['status'] == 'outdated'])
latest_count = len([c for c in comparisons if c['status'] == 'latest'])
unknown = len([c for c in comparisons if c['status'] == 'unknown'])
return jsonify({
'success': True,
'server': {
'id': server.id,
'name': server.name,
'model': server.model
},
'comparisons': comparisons,
'summary': {
'total': total,
'outdated': outdated,
'latest': latest_count,
'unknown': unknown
}
})
except Exception as e:
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
@idrac_bp.route('/api/servers/firmware/compare-multi', methods=['POST'])
def compare_multi_servers_firmware():
"""
여러 서버의 펌웨어 버전 비교
"""
try:
data = request.json
server_ids = data.get('server_ids', [])
if not server_ids:
return jsonify({
'success': False,
'message': '서버를 선택하세요'
})
results = []
for server_id in server_ids:
server = IdracServer.query.get(server_id)
if not server:
continue
try:
# 각 서버 비교
client = DellRedfishClient(server.ip_address, server.username, server.password)
current_inventory = client.get_firmware_inventory()
latest_versions = FirmwareVersion.query.filter_by(is_active=True).all()
outdated_count = 0
outdated_items = []
for current_fw in current_inventory:
component_name = current_fw['Name']
current_version = current_fw['Version']
# 최신 버전 찾기
for lv in latest_versions:
if lv.component_name.lower() in component_name.lower():
comparison = FirmwareComparisonResult(
component_name=component_name,
current_version=current_version,
latest_version=lv.latest_version
)
if comparison.status == 'outdated':
outdated_count += 1
outdated_items.append({
'component': component_name,
'current': current_version,
'latest': lv.latest_version
})
break
results.append({
'server_id': server.id,
'server_name': server.name,
'outdated_count': outdated_count,
'outdated_items': outdated_items,
'status': 'needs_update' if outdated_count > 0 else 'up_to_date'
})
except Exception as e:
results.append({
'server_id': server.id,
'server_name': server.name,
'error': str(e)
})
return jsonify({
'success': True,
'results': results
})
except Exception as e:
return jsonify({
'success': False,
'message': f'오류: {str(e)}'
})
# ========================================
# 초기 데이터 생성 함수
# ========================================
def init_firmware_versions():
"""초기 펌웨어 버전 데이터 생성"""
initial_versions = [
{
'component_name': 'BIOS',
'component_type': 'Firmware',
'vendor': 'Dell',
'server_model': 'PowerEdge R750',
'latest_version': '2.15.0',
'release_date': '2024-01-15',
'notes': 'PowerEdge R750 최신 BIOS'
},
{
'component_name': 'iDRAC',
'component_type': 'Firmware',
'vendor': 'Dell',
'latest_version': '6.10.30.00',
'release_date': '2024-02-20',
'notes': 'iDRAC9 최신 펌웨어 (모든 모델 공용)'
},
{
'component_name': 'PERC H755',
'component_type': 'Firmware',
'vendor': 'Dell',
'server_model': 'PowerEdge R750',
'latest_version': '25.5.9.0001',
'release_date': '2024-01-10',
'notes': 'PERC H755 RAID 컨트롤러'
},
{
'component_name': 'BIOS',
'component_type': 'Firmware',
'vendor': 'Dell',
'server_model': 'PowerEdge R640',
'latest_version': '2.19.2',
'release_date': '2024-02-01',
'notes': 'PowerEdge R640 최신 BIOS'
},
{
'component_name': 'CPLD',
'component_type': 'Firmware',
'vendor': 'Dell',
'latest_version': '1.0.6',
'release_date': '2023-12-15',
'notes': '시스템 보드 CPLD (14G/15G 공용)'
},
]
for data in initial_versions:
# 중복 체크
existing = FirmwareVersion.query.filter_by(
component_name=data['component_name'],
server_model=data.get('server_model')
).first()
if not existing:
version = FirmwareVersion(**data)
db.session.add(version)
try:
db.session.commit()
print("✓ 초기 펌웨어 버전 데이터 생성 완료")
except:
db.session.rollback()
print("⚠ 초기 데이터 생성 중 오류 (이미 있을 수 있음)")

View File

@@ -1,9 +1,10 @@
from __future__ import annotations
import logging
import unicodedata
from pathlib import Path
from flask import Blueprint, render_template, request, redirect, url_for, flash
from flask_login import login_required
from werkzeug.utils import secure_filename
from config import Config
xml_bp = Blueprint("xml", __name__)
@@ -17,12 +18,26 @@ def allowed_file(filename: str) -> bool:
return "." in filename and filename.rsplit(".", 1)[1].lower() in Config.ALLOWED_EXTENSIONS
def sanitize_preserve_unicode(filename: str) -> str:
"""
디렉터리 탐색/제어 문자를 차단하면서, 한글/유니코드 파일명은 그대로 보존합니다.
- 경로 요소 제거 (Path(...).name)
- 유니코드 정규화(NFC)로 OS간 차이 최소화
- 널문자/슬래시/역슬래시 차단
"""
name = Path(filename).name
name = unicodedata.normalize("NFC", name)
if not name or any(ch in name for ch in ["\x00", "/", "\\"]):
raise ValueError("잘못된 파일명입니다.")
return name
@xml_bp.route("/xml_management")
@login_required
def xml_management():
xml_dir = Path(Config.XML_FOLDER)
try:
files = [f.name for f in xml_dir.iterdir() if f.is_file()]
files = sorted([f.name for f in xml_dir.iterdir() if f.is_file()])
except FileNotFoundError:
files = []
flash("XML 폴더가 존재하지 않습니다.", "danger")
@@ -37,58 +52,83 @@ def upload_xml():
flash("업로드할 파일을 선택하세요.", "warning")
return redirect(url_for("xml.xml_management"))
if allowed_file(file.filename):
filename = secure_filename(file.filename)
save_path = Path(Config.XML_FOLDER) / filename
try:
save_path.parent.mkdir(parents=True, exist_ok=True)
file.save(str(save_path))
# 텍스트 파일이므로 0644 권장
try:
save_path.chmod(0o644)
except Exception:
pass # Windows 등에서 무시
logging.info(f"XML 업로드됨: {filename}")
flash("파일이 성공적으로 업로드되었습니다.", "success")
except Exception as e:
logging.error(f"파일 업로드 오류: {e}")
flash("파일 저장 중 오류가 발생했습니다.", "danger")
else:
if not allowed_file(file.filename):
flash("XML 확장자만 업로드할 수 있습니다.", "warning")
return redirect(url_for("xml.xml_management"))
try:
filename = sanitize_preserve_unicode(file.filename) # 한글/유니코드 보존
except ValueError:
flash("파일명이 올바르지 않습니다.", "danger")
return redirect(url_for("xml.xml_management"))
save_path = Path(Config.XML_FOLDER) / filename
try:
save_path.parent.mkdir(parents=True, exist_ok=True)
file.save(str(save_path))
# 텍스트 파일 권장 권한 (Windows에서는 무시될 수 있음)
try:
save_path.chmod(0o644)
except Exception:
pass
logging.info(f"XML 업로드됨: {filename}")
flash("파일이 성공적으로 업로드되었습니다.", "success")
except Exception as e:
logging.error(f"파일 업로드 오류: {e}")
flash("파일 저장 중 오류가 발생했습니다.", "danger")
return redirect(url_for("xml.xml_management"))
@xml_bp.route("/delete_xml/<filename>", methods=["POST"])
@xml_bp.route("/delete_xml/<path:filename>", methods=["POST"])
@login_required
def delete_xml(filename: str):
path = Path(Config.XML_FOLDER) / secure_filename(filename)
if path.exists():
try:
path.unlink()
flash(f"{filename} 파일이 삭제되었습니다.", "success")
logging.info(f"XML 삭제됨: {filename}")
except Exception as e:
logging.error(f"XML 삭제 오류: {e}")
flash("파일 삭제 중 오류 발생", "danger")
else:
try:
safe_name = sanitize_preserve_unicode(filename)
except ValueError:
flash("잘못된 파일명입니다.", "danger")
return redirect(url_for("xml.xml_management"))
path = Path(Config.XML_FOLDER) / safe_name
if not path.exists():
flash("해당 파일이 존재하지 않습니다.", "warning")
return redirect(url_for("xml.xml_management"))
try:
path.unlink()
flash(f"{safe_name} 파일이 삭제되었습니다.", "success")
logging.info(f"XML 삭제됨: {safe_name}")
except Exception as e:
logging.error(f"XML 삭제 오류: {e}")
flash("파일 삭제 중 오류 발생", "danger")
return redirect(url_for("xml.xml_management"))
@xml_bp.route("/edit_xml/<filename>", methods=["GET", "POST"])
@xml_bp.route("/edit_xml/<path:filename>", methods=["GET", "POST"])
@login_required
def edit_xml(filename: str):
path = Path(Config.XML_FOLDER) / secure_filename(filename)
try:
safe_name = sanitize_preserve_unicode(filename)
except ValueError:
flash("잘못된 파일명입니다.", "danger")
return redirect(url_for("xml.xml_management"))
path = Path(Config.XML_FOLDER) / safe_name
if not path.exists():
flash("파일을 찾을 수 없습니다.", "danger")
return redirect(url_for("xml.xml_management"))
if request.method == "POST":
new_content = request.form.get("content", "")
raw = request.form.get("content", "")
# 1) 개행 통일: CRLF/CR → LF
normalized = raw.replace("\r\n", "\n").replace("\r", "\n")
try:
path.write_text(new_content, encoding="utf-8")
logging.info(f"XML 수정됨: {filename}")
# 2) 항상 LF로 저장 (Windows에서도 강제)
path.write_text(normalized, encoding="utf-8", newline="\n")
logging.info(f"XML 수정됨: {safe_name}")
flash("파일이 성공적으로 수정되었습니다.", "success")
return redirect(url_for("xml.xml_management"))
except Exception as e:
@@ -96,10 +136,11 @@ def edit_xml(filename: str):
flash("파일 저장 중 오류가 발생했습니다.", "danger")
try:
content = path.read_text(encoding="utf-8")
# 보기/편집 일관성을 위해 읽을 때도 LF로 맞춰서 textarea에 넣음
content = path.read_text(encoding="utf-8").replace("\r\n", "\n").replace("\r", "\n")
except Exception as e:
logging.error(f"XML 열기 실패: {e}")
flash("파일 열기 중 오류가 발생했습니다.", "danger")
content = ""
return render_template("edit_xml.html", filename=filename, content=content)
return render_template("edit_xml.html", filename=safe_name, content=content)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Some files were not shown because too many files have changed in this diff Show More