Files
iDRAC_Info/backend/routes/utilities.py
2026-01-20 20:47:45 +09:00

525 lines
18 KiB
Python

from __future__ import annotations
import os
import sys
import shutil
import subprocess
import logging
from pathlib import Path
from flask import Blueprint, request, redirect, url_for, flash, jsonify, send_file
from flask_login import login_required
from config import Config
utils_bp = Blueprint("utils", __name__)
def register_util_routes(app):
app.register_blueprint(utils_bp)
@utils_bp.route("/move_mac_files", methods=["POST"])
@login_required
def move_mac_files():
src = Path(Config.IDRAC_INFO_FOLDER)
dst = Path(Config.MAC_FOLDER)
dst.mkdir(parents=True, exist_ok=True)
moved = 0
skipped = 0
missing = 0
errors = []
# JSON 요청 파싱 (overwrite 플래그 확인)
data = request.get_json(silent=True) or {}
overwrite = data.get("overwrite", False)
# 1. 대상 파일 스냅샷 (이동 시도할, 또는 해야할 파일들)
try:
current_files = [f for f in src.iterdir() if f.is_file()]
except Exception as e:
logging.error(f"파일 목록 조회 실패: {e}")
return jsonify({"success": False, "error": str(e)})
# [중복 체크 로직] 덮어쓰기 모드가 아닐 때, 미리 중복 검사
if not overwrite:
duplicates = []
for file in current_files:
target = dst / file.name
if target.exists():
duplicates.append(file.name)
if duplicates:
return jsonify({
"success": False,
"requires_confirmation": True,
"duplicates": duplicates,
"duplicate_count": len(duplicates)
})
else:
logging.warning(f"⚠️ [MAC] 덮어쓰기 모드 활성화됨 - 중복 파일을 덮어씁니다.")
total_target_count = len(current_files)
# 카운터
moved_count = 0 # 내가 직접 옮김 (또는 덮어씀)
verified_count = 0 # 최종적으로 목적지에 있음을 확인 (성공)
lost_count = 0 # 소스에도 없고 목적지에도 없음 (진짜 유실?)
errors = []
for file in current_files:
target = dst / file.name
# [Step 1] 이미 목적지에 있는지 확인
if target.exists():
if overwrite:
# 덮어쓰기 모드: 기존 파일 삭제 후 이동 진행 (또는 바로 move로 덮어쓰기)
# shutil.move는 대상이 존재하면 에러가 날 수 있으므로(버전/OS따라 다름), 안전하게 삭제 시도
try:
# Windows에서는 사용 중인 파일 삭제 시 에러 발생 가능
# shutil.move(src, dst)는 dst가 존재하면 덮어쓰기 시도함 (Python 3.x)
pass
except Exception:
pass
else:
# (중복 체크를 통과했거나 Race Condition으로 생성된 경우) -> 이미 완료된 것으로 간주
verified_count += 1
logging.info(f"⏭️ 파일 이미 존재 (Skipped): {file.name}")
continue
# [Step 2] 소스에 있는지 확인 (Race Condition)
if not file.exists():
if target.exists():
verified_count += 1
continue
else:
lost_count += 1
logging.warning(f"❓ 이동 중 사라짐: {file.name}")
continue
# [Step 3] 이동 시도 (덮어쓰기 포함)
try:
shutil.move(str(file), str(target))
moved_count += 1
verified_count += 1
except shutil.Error as e:
# shutil.move might raise Error if destination exists depending on implementation,
# but standard behavior overwrites if not same file.
# If exact same file, verified.
if target.exists():
verified_count += 1
else:
errors.append(f"{file.name}: {str(e)}")
except FileNotFoundError:
# 옮기려는 찰나에 사라짐 -> 목적지 재확인
if target.exists():
verified_count += 1
logging.info(f"⏭️ 동시 처리됨 (완료): {file.name}")
else:
lost_count += 1
except Exception as e:
# 권한 에러 등 진짜 실패
error_msg = f"{file.name}: {str(e)}"
errors.append(error_msg)
logging.error(f"❌ 이동 에러: {error_msg}")
# 결과 요약
msg = f"{total_target_count}건 중 {verified_count}건 처리 완료"
if moved_count < verified_count:
msg += f" (이동: {moved_count}, 이미 완료: {verified_count - moved_count})"
if lost_count > 0:
msg += f", 확인 불가: {lost_count}"
logging.info(f"✅ MAC 처리 결과: {msg}")
flash(msg, "success" if lost_count == 0 else "warning")
return jsonify({
"success": True,
"total": total_target_count,
"verified": verified_count,
"message": msg,
"errors": errors if errors else None
})
@utils_bp.route("/move_guid_files", methods=["POST"])
@login_required
def move_guid_files():
src = Path(Config.IDRAC_INFO_FOLDER)
dst = Path(Config.GUID_FOLDER)
dst.mkdir(parents=True, exist_ok=True)
moved = 0
skipped = 0
missing = 0
errors = []
# JSON 요청 파싱 (overwrite 플래그 확인)
data = request.get_json(silent=True) or {}
overwrite = data.get("overwrite", False)
try:
files = [f for f in src.iterdir() if f.is_file()]
except Exception:
files = []
# [중복 체크]
if not overwrite:
duplicates = []
for file in files:
target = dst / file.name
if target.exists():
duplicates.append(file.name)
if duplicates:
return jsonify({
"success": False,
"requires_confirmation": True,
"duplicates": duplicates,
"duplicate_count": len(duplicates)
})
else:
logging.warning(f"⚠️ [GUID] 덮어쓰기 모드 활성화됨 - 중복 파일을 덮어씁니다.")
total_target_count = len(files)
verified_count = 0
moved_count = 0
errors = []
lost_count = 0
try:
for file in files:
target = dst / file.name
# 1. 이미 완료되었는지 확인
if target.exists():
if not overwrite:
verified_count += 1
continue
# overwrite=True면 계속 진행하여 덮어쓰기 시도
# 2. 소스 확인
if not file.exists():
if target.exists(): verified_count += 1
else: lost_count += 1
continue
# 3. 이동
try:
shutil.move(str(file), str(target))
moved_count += 1
verified_count += 1
except FileNotFoundError:
if target.exists(): verified_count += 1
else: lost_count += 1
except Exception as e:
errors.append(f"{file.name}: {e}")
# 상세 메시지
msg = f"{total_target_count}건 중 {verified_count}건 처리 완료"
logging.info(f"✅ GUID 처리: {msg}")
flash(msg, "success" if lost_count == 0 else "warning")
return jsonify({
"success": True,
"total": total_target_count,
"verified": verified_count,
"message": msg,
"errors": errors if errors else None
})
except Exception as e:
logging.error(f"❌ GUID 이동 중 오류: {e}")
return jsonify({"success": False, "error": str(e)})
@utils_bp.route("/move_gpu_files", methods=["POST"])
@login_required
def move_gpu_files():
"""
data/idrac_info → data/gpu_serial 로 GPU 시리얼 텍스트 파일 이동
"""
src = Path(Config.IDRAC_INFO_FOLDER) # 예: data/idrac_info
dst = Path(Config.GPU_FOLDER) # 예: data/gpu_serial
dst.mkdir(parents=True, exist_ok=True)
moved = 0
skipped = 0
missing = 0
errors = []
# JSON 요청 파싱
data = request.get_json(silent=True) or {}
overwrite = data.get("overwrite", False)
try:
files = [f for f in src.iterdir() if f.is_file()]
except Exception:
files = []
# [중복 체크]
if not overwrite:
duplicates = []
for file in files:
target = dst / file.name
if target.exists():
duplicates.append(file.name)
if duplicates:
return jsonify({
"success": False,
"requires_confirmation": True,
"duplicates": duplicates,
"duplicate_count": len(duplicates)
})
else:
logging.warning(f"⚠️ [GPU] 덮어쓰기 모드 활성화됨 - 중복 파일을 덮어씁니다.")
total_target_count = len(files)
verified_count = 0
moved_count = 0
errors = []
lost_count = 0
try:
for file in files:
target = dst / file.name
# 1. 존재 확인 (덮어쓰기 아닐 경우)
if target.exists():
if not overwrite:
verified_count += 1
continue
# 2. 소스 확인
if not file.exists():
if target.exists(): verified_count += 1
else: lost_count += 1
continue
# 3. 이동
try:
shutil.move(str(file), str(target))
moved_count += 1
verified_count += 1
except FileNotFoundError:
if target.exists(): verified_count += 1
else: lost_count += 1
except Exception as e:
errors.append(f"{file.name}: {e}")
# 상세 메시지
msg = f"{total_target_count}건 중 {verified_count}건 처리 완료"
logging.info(f"✅ GPU 처리: {msg}")
flash(msg, "success")
return jsonify({
"success": True,
"total": total_target_count,
"verified": verified_count,
"message": msg,
"errors": errors if errors else None
})
except Exception as e:
logging.error(f"❌ GPU 이동 오류: {e}")
return jsonify({"success": False, "error": str(e)})
@utils_bp.route("/update_server_list", methods=["POST"])
@login_required
def update_server_list():
content = request.form.get("server_list_content")
if not content:
flash("내용을 입력하세요.", "warning")
return redirect(url_for("main.index"))
path = Path(Config.SERVER_LIST_FOLDER) / "server_list.txt"
try:
path.write_text(content, encoding="utf-8")
result = subprocess.run(
[sys.executable, str(Path(Config.SERVER_LIST_FOLDER) / "excel.py")],
capture_output=True,
text=True,
check=True,
cwd=str(Path(Config.SERVER_LIST_FOLDER)),
timeout=300,
)
logging.info(f"서버 리스트 스크립트 실행 결과: {result.stdout}")
flash("서버 리스트가 업데이트되었습니다.", "success")
except subprocess.CalledProcessError as e:
logging.error(f"서버 리스트 스크립트 오류: {e.stderr}")
flash(f"스크립트 실행 실패: {e.stderr}", "danger")
except Exception as e:
logging.error(f"서버 리스트 처리 오류: {e}")
flash(f"서버 리스트 처리 중 오류 발생: {e}", "danger")
return redirect(url_for("main.index"))
@utils_bp.route("/update_guid_list", methods=["POST"])
@login_required
def update_guid_list():
content = request.form.get("server_list_content")
slot_priority = request.form.get("slot_priority", "") # 슬롯 우선순위 받기
if not content:
flash("내용을 입력하세요.", "warning")
return redirect(url_for("main.index"))
path = Path(Config.SERVER_LIST_FOLDER) / "guid_list.txt"
try:
path.write_text(content, encoding="utf-8")
# 슬롯 우선순위를 환경변수로 전달
env = os.environ.copy()
if slot_priority:
env["GUID_SLOT_PRIORITY"] = slot_priority
logging.info(f"GUID 슬롯 우선순위: {slot_priority}")
result = subprocess.run(
[sys.executable, str(Path(Config.SERVER_LIST_FOLDER) / "GUIDtxtT0Execl.py")],
capture_output=True,
text=True,
check=True,
cwd=str(Path(Config.SERVER_LIST_FOLDER)),
timeout=300,
env=env, # 환경변수 전달
)
logging.info(f"GUID 리스트 스크립트 실행 결과: {result.stdout}")
flash("GUID 리스트가 업데이트되었습니다.", "success")
except subprocess.CalledProcessError as e:
logging.error(f"GUID 리스트 스크립트 오류: {e.stderr}")
flash(f"스크립트 실행 실패: {e.stderr}", "danger")
except Exception as e:
logging.error(f"GUID 리스트 처리 오류: {e}")
flash(f"GUID 리스트 처리 중 오류 발생: {e}", "danger")
return redirect(url_for("main.index"))
@utils_bp.route("/update_gpu_list", methods=["POST"])
@login_required
def update_gpu_list():
"""
GPU 시리얼용 리스트(gpu_serial_list.txt)를 갱신하고 Excel을 생성합니다.
- form name="gpu_list_content" 로 내용 전달 (S/T 목록 라인별)
- txt_to_excel.py --preset gpu --list-file <gpu_serial_list.txt>
"""
content = request.form.get("server_list_content")
if not content:
flash("내용을 입력하세요.", "warning")
return redirect(url_for("main.index"))
server_list_dir = Path(Config.SERVER_LIST_FOLDER)
list_path = server_list_dir / "gpu_list.txt"
# txt_to_excel.py는 server_list 폴더에 둔다고 가정 (위치 다르면 경로만 수정)
script_path = server_list_dir / "GPUTOExecl.py"
try:
# 1) gpu_serial_list.txt 저장
list_path.write_text(content, encoding="utf-8")
# 2) 엑셀 생성 실행 (GPU 프리셋)
cmd = [
sys.executable,
str(script_path),
"--preset", "gpu",
"--list-file", str(list_path),
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
cwd=str(server_list_dir), # data/server_list 기준 실행
timeout=300,
)
logging.info(f"[GPU] 리스트 스크립트 실행 STDOUT:\n{result.stdout}")
if result.stderr:
logging.warning(f"[GPU] 리스트 스크립트 STDERR:\n{result.stderr}")
flash("GPU 리스트가 업데이트되었습니다.", "success")
except subprocess.CalledProcessError as e:
logging.error(f"[GPU] 스크립트 오류: {e.stderr}")
flash(f"스크립트 실행 실패: {e.stderr}", "danger")
except Exception as e:
logging.error(f"[GPU] 처리 오류: {e}")
flash(f"GPU 리스트 처리 중 오류 발생: {e}", "danger")
return redirect(url_for("main.index"))
logging.info(f"엑셀 파일 다운로드: {path}")
return send_file(str(path), as_attachment=True, download_name="mac_info.xlsx")
@utils_bp.route("/scan_network", methods=["POST"])
@login_required
def scan_network():
"""
지정된 IP 범위(Start ~ End)에 대해 Ping 테스트를 수행하고
응답이 있는 IP 목록을 반환합니다.
"""
try:
import ipaddress
import platform
import concurrent.futures
data = request.get_json(force=True, silent=True) or {}
start_ip_str = data.get('start_ip')
end_ip_str = data.get('end_ip')
if not start_ip_str or not end_ip_str:
return jsonify({"success": False, "error": "시작 IP와 종료 IP를 모두 입력해주세요."}), 400
try:
start_ip = ipaddress.IPv4Address(start_ip_str)
end_ip = ipaddress.IPv4Address(end_ip_str)
if start_ip > end_ip:
return jsonify({"success": False, "error": "시작 IP가 종료 IP보다 큽니다."}), 400
# IP 개수 제한 (너무 많은 스캔 방지, 예: C클래스 2개 분량 512개)
if int(end_ip) - int(start_ip) > 512:
return jsonify({"success": False, "error": "스캔 범위가 너무 넓습니다. (최대 512개)"}), 400
except ValueError:
return jsonify({"success": False, "error": "유효하지 않은 IP 주소 형식입니다."}), 400
# Ping 함수 정의
def ping_ip(ip_obj):
ip = str(ip_obj)
param = '-n' if platform.system().lower() == 'windows' else '-c'
timeout_param = '-w' if platform.system().lower() == 'windows' else '-W'
# Windows: -w 200 (ms), Linux: -W 1 (s)
timeout_val = '200' if platform.system().lower() == 'windows' else '1'
command = ['ping', param, '1', timeout_param, timeout_val, ip]
try:
# shell=False로 보안 강화, stdout/stderr 무시
res = subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return ip if res.returncode == 0 else None
except Exception:
return None
active_ips = []
# IP 리스트 생성
target_ips = []
temp_ip = start_ip
while temp_ip <= end_ip:
target_ips.append(temp_ip)
temp_ip += 1
# 병렬 처리 (최대 50 쓰레드)
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
results = executor.map(ping_ip, target_ips)
# 결과 수집 (None 제외)
active_ips = [ip for ip in results if ip is not None]
return jsonify({
"success": True,
"active_ips": active_ips,
"count": len(active_ips),
"message": f"스캔 완료: {len(active_ips)}개의 활성 IP 발견"
})
except Exception as e:
logging.error(f"Scan network fatal error: {e}")
return jsonify({"success": False, "error": f"서버 내부 오류: {str(e)}"}), 500