526 lines
18 KiB
Python
526 lines
18 KiB
Python
from __future__ import annotations
|
|
import os
|
|
import sys
|
|
import shutil
|
|
import subprocess
|
|
import logging
|
|
from pathlib import Path
|
|
from flask import Blueprint, request, redirect, url_for, flash, jsonify, send_file
|
|
from flask_login import login_required
|
|
from config import Config
|
|
|
|
utils_bp = Blueprint("utils", __name__)
|
|
|
|
|
|
def register_util_routes(app):
|
|
app.register_blueprint(utils_bp)
|
|
|
|
|
|
@utils_bp.route("/move_mac_files", methods=["POST"])
|
|
@login_required
|
|
def move_mac_files():
|
|
src = Path(Config.IDRAC_INFO_FOLDER)
|
|
dst = Path(Config.MAC_FOLDER)
|
|
dst.mkdir(parents=True, exist_ok=True)
|
|
|
|
moved = 0
|
|
skipped = 0
|
|
missing = 0
|
|
errors = []
|
|
|
|
# JSON 요청 파싱 (overwrite 플래그 확인)
|
|
data = request.get_json(silent=True) or {}
|
|
overwrite = data.get("overwrite", False)
|
|
|
|
# 1. 대상 파일 스냅샷 (이동 시도할, 또는 해야할 파일들)
|
|
try:
|
|
current_files = [f for f in src.iterdir() if f.is_file()]
|
|
except Exception as e:
|
|
logging.error(f"파일 목록 조회 실패: {e}")
|
|
return jsonify({"success": False, "error": str(e)})
|
|
|
|
# [중복 체크 로직] 덮어쓰기 모드가 아닐 때, 미리 중복 검사
|
|
if not overwrite:
|
|
duplicates = []
|
|
for file in current_files:
|
|
target = dst / file.name
|
|
if target.exists():
|
|
duplicates.append(file.name)
|
|
|
|
if duplicates:
|
|
return jsonify({
|
|
"success": False,
|
|
"requires_confirmation": True,
|
|
"duplicates": duplicates,
|
|
"duplicate_count": len(duplicates)
|
|
})
|
|
else:
|
|
logging.warning(f"⚠️ [MAC] 덮어쓰기 모드 활성화됨 - 중복 파일을 덮어씁니다.")
|
|
|
|
total_target_count = len(current_files)
|
|
|
|
# 카운터
|
|
moved_count = 0 # 내가 직접 옮김 (또는 덮어씀)
|
|
verified_count = 0 # 최종적으로 목적지에 있음을 확인 (성공)
|
|
lost_count = 0 # 소스에도 없고 목적지에도 없음 (진짜 유실?)
|
|
errors = []
|
|
|
|
for file in current_files:
|
|
target = dst / file.name
|
|
|
|
# [Step 1] 이미 목적지에 있는지 확인
|
|
if target.exists():
|
|
if overwrite:
|
|
# 덮어쓰기 모드: 기존 파일 삭제 후 이동 진행 (또는 바로 move로 덮어쓰기)
|
|
# shutil.move는 대상이 존재하면 에러가 날 수 있으므로(버전/OS따라 다름), 안전하게 삭제 시도
|
|
try:
|
|
# Windows에서는 사용 중인 파일 삭제 시 에러 발생 가능
|
|
# shutil.move(src, dst)는 dst가 존재하면 덮어쓰기 시도함 (Python 3.x)
|
|
pass
|
|
except Exception:
|
|
pass
|
|
else:
|
|
# (중복 체크를 통과했거나 Race Condition으로 생성된 경우) -> 이미 완료된 것으로 간주
|
|
verified_count += 1
|
|
logging.info(f"⏭️ 파일 이미 존재 (Skipped): {file.name}")
|
|
continue
|
|
|
|
# [Step 2] 소스에 있는지 확인 (Race Condition)
|
|
if not file.exists():
|
|
if target.exists():
|
|
verified_count += 1
|
|
continue
|
|
else:
|
|
lost_count += 1
|
|
logging.warning(f"❓ 이동 중 사라짐: {file.name}")
|
|
continue
|
|
|
|
# [Step 3] 이동 시도 (덮어쓰기 포함)
|
|
try:
|
|
shutil.move(str(file), str(target))
|
|
moved_count += 1
|
|
verified_count += 1
|
|
except shutil.Error as e:
|
|
# shutil.move might raise Error if destination exists depending on implementation,
|
|
# but standard behavior overwrites if not same file.
|
|
# If exact same file, verified.
|
|
if target.exists():
|
|
verified_count += 1
|
|
else:
|
|
errors.append(f"{file.name}: {str(e)}")
|
|
except FileNotFoundError:
|
|
# 옮기려는 찰나에 사라짐 -> 목적지 재확인
|
|
if target.exists():
|
|
verified_count += 1
|
|
logging.info(f"⏭️ 동시 처리됨 (완료): {file.name}")
|
|
else:
|
|
lost_count += 1
|
|
except Exception as e:
|
|
# 권한 에러 등 진짜 실패
|
|
error_msg = f"{file.name}: {str(e)}"
|
|
errors.append(error_msg)
|
|
logging.error(f"❌ 이동 에러: {error_msg}")
|
|
|
|
# 결과 요약
|
|
msg = f"총 {total_target_count}건 중 {verified_count}건 처리 완료"
|
|
if moved_count < verified_count:
|
|
msg += f" (이동: {moved_count}, 이미 완료: {verified_count - moved_count})"
|
|
|
|
if lost_count > 0:
|
|
msg += f", 확인 불가: {lost_count}"
|
|
|
|
logging.info(f"✅ MAC 처리 결과: {msg}")
|
|
flash(msg, "success" if lost_count == 0 else "warning")
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"total": total_target_count,
|
|
"verified": verified_count,
|
|
"message": msg,
|
|
"errors": errors if errors else None
|
|
})
|
|
|
|
|
|
@utils_bp.route("/move_guid_files", methods=["POST"])
|
|
@login_required
|
|
def move_guid_files():
|
|
src = Path(Config.IDRAC_INFO_FOLDER)
|
|
dst = Path(Config.GUID_FOLDER)
|
|
dst.mkdir(parents=True, exist_ok=True)
|
|
|
|
moved = 0
|
|
skipped = 0
|
|
missing = 0
|
|
errors = []
|
|
|
|
# JSON 요청 파싱 (overwrite 플래그 확인)
|
|
data = request.get_json(silent=True) or {}
|
|
overwrite = data.get("overwrite", False)
|
|
|
|
try:
|
|
files = [f for f in src.iterdir() if f.is_file()]
|
|
except Exception:
|
|
files = []
|
|
|
|
# [중복 체크]
|
|
if not overwrite:
|
|
duplicates = []
|
|
for file in files:
|
|
target = dst / file.name
|
|
if target.exists():
|
|
duplicates.append(file.name)
|
|
if duplicates:
|
|
return jsonify({
|
|
"success": False,
|
|
"requires_confirmation": True,
|
|
"duplicates": duplicates,
|
|
"duplicate_count": len(duplicates)
|
|
})
|
|
else:
|
|
logging.warning(f"⚠️ [GUID] 덮어쓰기 모드 활성화됨 - 중복 파일을 덮어씁니다.")
|
|
|
|
total_target_count = len(files)
|
|
verified_count = 0
|
|
moved_count = 0
|
|
errors = []
|
|
lost_count = 0
|
|
|
|
try:
|
|
for file in files:
|
|
target = dst / file.name
|
|
|
|
# 1. 이미 완료되었는지 확인
|
|
if target.exists():
|
|
if not overwrite:
|
|
verified_count += 1
|
|
continue
|
|
# overwrite=True면 계속 진행하여 덮어쓰기 시도
|
|
|
|
# 2. 소스 확인
|
|
if not file.exists():
|
|
if target.exists(): verified_count += 1
|
|
else: lost_count += 1
|
|
continue
|
|
|
|
# 3. 이동
|
|
try:
|
|
shutil.move(str(file), str(target))
|
|
moved_count += 1
|
|
verified_count += 1
|
|
except FileNotFoundError:
|
|
if target.exists(): verified_count += 1
|
|
else: lost_count += 1
|
|
except Exception as e:
|
|
errors.append(f"{file.name}: {e}")
|
|
|
|
# 상세 메시지
|
|
msg = f"총 {total_target_count}건 중 {verified_count}건 처리 완료"
|
|
logging.info(f"✅ GUID 처리: {msg}")
|
|
flash(msg, "success" if lost_count == 0 else "warning")
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"total": total_target_count,
|
|
"verified": verified_count,
|
|
"message": msg,
|
|
"errors": errors if errors else None
|
|
})
|
|
|
|
except Exception as e:
|
|
logging.error(f"❌ GUID 이동 중 오류: {e}")
|
|
return jsonify({"success": False, "error": str(e)})
|
|
|
|
@utils_bp.route("/move_gpu_files", methods=["POST"])
|
|
@login_required
|
|
def move_gpu_files():
|
|
"""
|
|
data/idrac_info → data/gpu_serial 로 GPU 시리얼 텍스트 파일 이동
|
|
"""
|
|
src = Path(Config.IDRAC_INFO_FOLDER) # 예: data/idrac_info
|
|
dst = Path(Config.GPU_FOLDER) # 예: data/gpu_serial
|
|
dst.mkdir(parents=True, exist_ok=True)
|
|
|
|
moved = 0
|
|
skipped = 0
|
|
missing = 0
|
|
errors = []
|
|
|
|
# JSON 요청 파싱
|
|
data = request.get_json(silent=True) or {}
|
|
overwrite = data.get("overwrite", False)
|
|
|
|
try:
|
|
all_files = [f for f in src.iterdir() if f.is_file()]
|
|
files = [f for f in all_files if f.name.lower().endswith(".txt")]
|
|
except Exception:
|
|
files = []
|
|
|
|
# [중복 체크]
|
|
if not overwrite:
|
|
duplicates = []
|
|
for file in files:
|
|
target = dst / file.name
|
|
if target.exists():
|
|
duplicates.append(file.name)
|
|
if duplicates:
|
|
return jsonify({
|
|
"success": False,
|
|
"requires_confirmation": True,
|
|
"duplicates": duplicates,
|
|
"duplicate_count": len(duplicates)
|
|
})
|
|
else:
|
|
logging.warning(f"⚠️ [GPU] 덮어쓰기 모드 활성화됨 - 중복 파일을 덮어씁니다.")
|
|
|
|
total_target_count = len(files)
|
|
verified_count = 0
|
|
moved_count = 0
|
|
errors = []
|
|
lost_count = 0
|
|
|
|
try:
|
|
for file in files:
|
|
target = dst / file.name
|
|
|
|
# 1. 존재 확인 (덮어쓰기 아닐 경우)
|
|
if target.exists():
|
|
if not overwrite:
|
|
verified_count += 1
|
|
continue
|
|
|
|
# 2. 소스 확인
|
|
if not file.exists():
|
|
if target.exists(): verified_count += 1
|
|
else: lost_count += 1
|
|
continue
|
|
|
|
# 3. 이동
|
|
try:
|
|
shutil.move(str(file), str(target))
|
|
moved_count += 1
|
|
verified_count += 1
|
|
except FileNotFoundError:
|
|
if target.exists(): verified_count += 1
|
|
else: lost_count += 1
|
|
except Exception as e:
|
|
errors.append(f"{file.name}: {e}")
|
|
|
|
# 상세 메시지
|
|
msg = f"총 {total_target_count}건 중 {verified_count}건 처리 완료"
|
|
logging.info(f"✅ GPU 처리: {msg}")
|
|
flash(msg, "success")
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"total": total_target_count,
|
|
"verified": verified_count,
|
|
"message": msg,
|
|
"errors": errors if errors else None
|
|
})
|
|
|
|
except Exception as e:
|
|
logging.error(f"❌ GPU 이동 오류: {e}")
|
|
return jsonify({"success": False, "error": str(e)})
|
|
|
|
@utils_bp.route("/update_server_list", methods=["POST"])
|
|
@login_required
|
|
def update_server_list():
|
|
content = request.form.get("server_list_content")
|
|
if not content:
|
|
flash("내용을 입력하세요.", "warning")
|
|
return redirect(url_for("main.index"))
|
|
|
|
path = Path(Config.SERVER_LIST_FOLDER) / "server_list.txt"
|
|
try:
|
|
path.write_text(content, encoding="utf-8")
|
|
result = subprocess.run(
|
|
[sys.executable, str(Path(Config.SERVER_LIST_FOLDER) / "excel.py")],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
cwd=str(Path(Config.SERVER_LIST_FOLDER)),
|
|
timeout=300,
|
|
)
|
|
logging.info(f"서버 리스트 스크립트 실행 결과: {result.stdout}")
|
|
flash("서버 리스트가 업데이트되었습니다.", "success")
|
|
except subprocess.CalledProcessError as e:
|
|
logging.error(f"서버 리스트 스크립트 오류: {e.stderr}")
|
|
flash(f"스크립트 실행 실패: {e.stderr}", "danger")
|
|
except Exception as e:
|
|
logging.error(f"서버 리스트 처리 오류: {e}")
|
|
flash(f"서버 리스트 처리 중 오류 발생: {e}", "danger")
|
|
|
|
return redirect(url_for("main.index"))
|
|
|
|
|
|
@utils_bp.route("/update_guid_list", methods=["POST"])
|
|
@login_required
|
|
def update_guid_list():
|
|
content = request.form.get("server_list_content")
|
|
slot_priority = request.form.get("slot_priority", "") # 슬롯 우선순위 받기
|
|
|
|
if not content:
|
|
flash("내용을 입력하세요.", "warning")
|
|
return redirect(url_for("main.index"))
|
|
|
|
path = Path(Config.SERVER_LIST_FOLDER) / "guid_list.txt"
|
|
try:
|
|
path.write_text(content, encoding="utf-8")
|
|
|
|
# 슬롯 우선순위를 환경변수로 전달
|
|
env = os.environ.copy()
|
|
if slot_priority:
|
|
env["GUID_SLOT_PRIORITY"] = slot_priority
|
|
logging.info(f"GUID 슬롯 우선순위: {slot_priority}")
|
|
|
|
result = subprocess.run(
|
|
[sys.executable, str(Path(Config.SERVER_LIST_FOLDER) / "GUIDtxtT0Execl.py")],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
cwd=str(Path(Config.SERVER_LIST_FOLDER)),
|
|
timeout=300,
|
|
env=env, # 환경변수 전달
|
|
)
|
|
logging.info(f"GUID 리스트 스크립트 실행 결과: {result.stdout}")
|
|
flash("GUID 리스트가 업데이트되었습니다.", "success")
|
|
except subprocess.CalledProcessError as e:
|
|
logging.error(f"GUID 리스트 스크립트 오류: {e.stderr}")
|
|
flash(f"스크립트 실행 실패: {e.stderr}", "danger")
|
|
except Exception as e:
|
|
logging.error(f"GUID 리스트 처리 오류: {e}")
|
|
flash(f"GUID 리스트 처리 중 오류 발생: {e}", "danger")
|
|
|
|
return redirect(url_for("main.index"))
|
|
|
|
@utils_bp.route("/update_gpu_list", methods=["POST"])
|
|
@login_required
|
|
def update_gpu_list():
|
|
"""
|
|
GPU 시리얼용 리스트(gpu_serial_list.txt)를 갱신하고 Excel을 생성합니다.
|
|
- form name="gpu_list_content" 로 내용 전달 (S/T 목록 라인별)
|
|
- txt_to_excel.py --preset gpu --list-file <gpu_serial_list.txt>
|
|
"""
|
|
content = request.form.get("server_list_content")
|
|
if not content:
|
|
flash("내용을 입력하세요.", "warning")
|
|
return redirect(url_for("main.index"))
|
|
|
|
server_list_dir = Path(Config.SERVER_LIST_FOLDER)
|
|
list_path = server_list_dir / "gpu_list.txt"
|
|
# txt_to_excel.py는 server_list 폴더에 둔다고 가정 (위치 다르면 경로만 수정)
|
|
script_path = server_list_dir / "GPUTOExecl.py"
|
|
|
|
try:
|
|
# 1) gpu_serial_list.txt 저장
|
|
list_path.write_text(content, encoding="utf-8")
|
|
|
|
# 2) 엑셀 생성 실행 (GPU 프리셋)
|
|
cmd = [
|
|
sys.executable,
|
|
str(script_path),
|
|
"--preset", "gpu",
|
|
"--list-file", str(list_path),
|
|
]
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
cwd=str(server_list_dir), # data/server_list 기준 실행
|
|
timeout=300,
|
|
)
|
|
logging.info(f"[GPU] 리스트 스크립트 실행 STDOUT:\n{result.stdout}")
|
|
if result.stderr:
|
|
logging.warning(f"[GPU] 리스트 스크립트 STDERR:\n{result.stderr}")
|
|
|
|
flash("GPU 리스트가 업데이트되었습니다.", "success")
|
|
except subprocess.CalledProcessError as e:
|
|
logging.error(f"[GPU] 스크립트 오류: {e.stderr}")
|
|
flash(f"스크립트 실행 실패: {e.stderr}", "danger")
|
|
except Exception as e:
|
|
logging.error(f"[GPU] 처리 오류: {e}")
|
|
flash(f"GPU 리스트 처리 중 오류 발생: {e}", "danger")
|
|
|
|
return redirect(url_for("main.index"))
|
|
|
|
logging.info(f"엑셀 파일 다운로드: {path}")
|
|
return send_file(str(path), as_attachment=True, download_name="mac_info.xlsx")
|
|
|
|
|
|
@utils_bp.route("/scan_network", methods=["POST"])
|
|
@login_required
|
|
def scan_network():
|
|
"""
|
|
지정된 IP 범위(Start ~ End)에 대해 Ping 테스트를 수행하고
|
|
응답이 있는 IP 목록을 반환합니다.
|
|
"""
|
|
try:
|
|
import ipaddress
|
|
import platform
|
|
import concurrent.futures
|
|
|
|
data = request.get_json(force=True, silent=True) or {}
|
|
start_ip_str = data.get('start_ip')
|
|
end_ip_str = data.get('end_ip')
|
|
|
|
if not start_ip_str or not end_ip_str:
|
|
return jsonify({"success": False, "error": "시작 IP와 종료 IP를 모두 입력해주세요."}), 400
|
|
|
|
try:
|
|
start_ip = ipaddress.IPv4Address(start_ip_str)
|
|
end_ip = ipaddress.IPv4Address(end_ip_str)
|
|
|
|
if start_ip > end_ip:
|
|
return jsonify({"success": False, "error": "시작 IP가 종료 IP보다 큽니다."}), 400
|
|
|
|
# IP 개수 제한 (너무 많은 스캔 방지, 예: C클래스 2개 분량 512개)
|
|
if int(end_ip) - int(start_ip) > 512:
|
|
return jsonify({"success": False, "error": "스캔 범위가 너무 넓습니다. (최대 512개)"}), 400
|
|
|
|
except ValueError:
|
|
return jsonify({"success": False, "error": "유효하지 않은 IP 주소 형식입니다."}), 400
|
|
|
|
# Ping 함수 정의
|
|
def ping_ip(ip_obj):
|
|
ip = str(ip_obj)
|
|
param = '-n' if platform.system().lower() == 'windows' else '-c'
|
|
timeout_param = '-w' if platform.system().lower() == 'windows' else '-W'
|
|
# Windows: -w 200 (ms), Linux: -W 1 (s)
|
|
timeout_val = '200' if platform.system().lower() == 'windows' else '1'
|
|
|
|
command = ['ping', param, '1', timeout_param, timeout_val, ip]
|
|
|
|
try:
|
|
# shell=False로 보안 강화, stdout/stderr 무시
|
|
res = subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
return ip if res.returncode == 0 else None
|
|
except Exception:
|
|
return None
|
|
|
|
active_ips = []
|
|
|
|
# IP 리스트 생성
|
|
target_ips = []
|
|
temp_ip = start_ip
|
|
while temp_ip <= end_ip:
|
|
target_ips.append(temp_ip)
|
|
temp_ip += 1
|
|
|
|
# 병렬 처리 (최대 50 쓰레드)
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
|
|
results = executor.map(ping_ip, target_ips)
|
|
|
|
# 결과 수집 (None 제외)
|
|
active_ips = [ip for ip in results if ip is not None]
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"active_ips": active_ips,
|
|
"count": len(active_ips),
|
|
"message": f"스캔 완료: {len(active_ips)}개의 활성 IP 발견"
|
|
})
|
|
|
|
except Exception as e:
|
|
logging.error(f"Scan network fatal error: {e}")
|
|
return jsonify({"success": False, "error": f"서버 내부 오류: {str(e)}"}), 500 |