from __future__ import annotations import os import sys import shutil import subprocess import logging from pathlib import Path from flask import Blueprint, request, redirect, url_for, flash, jsonify, send_file from flask_login import login_required from config import Config utils_bp = Blueprint("utils", __name__) def register_util_routes(app): app.register_blueprint(utils_bp) @utils_bp.route("/move_mac_files", methods=["POST"]) @login_required def move_mac_files(): src = Path(Config.IDRAC_INFO_FOLDER) dst = Path(Config.MAC_FOLDER) dst.mkdir(parents=True, exist_ok=True) moved = 0 skipped = 0 missing = 0 errors = [] # JSON 요청 파싱 (overwrite 플래그 확인) data = request.get_json(silent=True) or {} overwrite = data.get("overwrite", False) # 1. 대상 파일 스냅샷 (이동 시도할, 또는 해야할 파일들) try: current_files = [f for f in src.iterdir() if f.is_file()] except Exception as e: logging.error(f"파일 목록 조회 실패: {e}") return jsonify({"success": False, "error": str(e)}) # [중복 체크 로직] 덮어쓰기 모드가 아닐 때, 미리 중복 검사 if not overwrite: duplicates = [] for file in current_files: target = dst / file.name if target.exists(): duplicates.append(file.name) if duplicates: return jsonify({ "success": False, "requires_confirmation": True, "duplicates": duplicates, "duplicate_count": len(duplicates) }) else: logging.warning(f"⚠️ [MAC] 덮어쓰기 모드 활성화됨 - 중복 파일을 덮어씁니다.") total_target_count = len(current_files) # 카운터 moved_count = 0 # 내가 직접 옮김 (또는 덮어씀) verified_count = 0 # 최종적으로 목적지에 있음을 확인 (성공) lost_count = 0 # 소스에도 없고 목적지에도 없음 (진짜 유실?) errors = [] for file in current_files: target = dst / file.name # [Step 1] 이미 목적지에 있는지 확인 if target.exists(): if overwrite: # 덮어쓰기 모드: 기존 파일 삭제 후 이동 진행 (또는 바로 move로 덮어쓰기) # shutil.move는 대상이 존재하면 에러가 날 수 있으므로(버전/OS따라 다름), 안전하게 삭제 시도 try: # Windows에서는 사용 중인 파일 삭제 시 에러 발생 가능 # shutil.move(src, dst)는 dst가 존재하면 덮어쓰기 시도함 (Python 3.x) pass except Exception: pass else: # (중복 체크를 통과했거나 Race Condition으로 생성된 경우) -> 이미 완료된 것으로 간주 verified_count += 1 logging.info(f"⏭️ 파일 이미 존재 (Skipped): {file.name}") continue # [Step 2] 소스에 있는지 확인 (Race Condition) if not file.exists(): if target.exists(): verified_count += 1 continue else: lost_count += 1 logging.warning(f"❓ 이동 중 사라짐: {file.name}") continue # [Step 3] 이동 시도 (덮어쓰기 포함) try: shutil.move(str(file), str(target)) moved_count += 1 verified_count += 1 except shutil.Error as e: # shutil.move might raise Error if destination exists depending on implementation, # but standard behavior overwrites if not same file. # If exact same file, verified. if target.exists(): verified_count += 1 else: errors.append(f"{file.name}: {str(e)}") except FileNotFoundError: # 옮기려는 찰나에 사라짐 -> 목적지 재확인 if target.exists(): verified_count += 1 logging.info(f"⏭️ 동시 처리됨 (완료): {file.name}") else: lost_count += 1 except Exception as e: # 권한 에러 등 진짜 실패 error_msg = f"{file.name}: {str(e)}" errors.append(error_msg) logging.error(f"❌ 이동 에러: {error_msg}") # 결과 요약 msg = f"총 {total_target_count}건 중 {verified_count}건 처리 완료" if moved_count < verified_count: msg += f" (이동: {moved_count}, 이미 완료: {verified_count - moved_count})" if lost_count > 0: msg += f", 확인 불가: {lost_count}" logging.info(f"✅ MAC 처리 결과: {msg}") flash(msg, "success" if lost_count == 0 else "warning") return jsonify({ "success": True, "total": total_target_count, "verified": verified_count, "message": msg, "errors": errors if errors else None }) @utils_bp.route("/move_guid_files", methods=["POST"]) @login_required def move_guid_files(): src = Path(Config.IDRAC_INFO_FOLDER) dst = Path(Config.GUID_FOLDER) dst.mkdir(parents=True, exist_ok=True) moved = 0 skipped = 0 missing = 0 errors = [] # JSON 요청 파싱 (overwrite 플래그 확인) data = request.get_json(silent=True) or {} overwrite = data.get("overwrite", False) try: files = [f for f in src.iterdir() if f.is_file()] except Exception: files = [] # [중복 체크] if not overwrite: duplicates = [] for file in files: target = dst / file.name if target.exists(): duplicates.append(file.name) if duplicates: return jsonify({ "success": False, "requires_confirmation": True, "duplicates": duplicates, "duplicate_count": len(duplicates) }) else: logging.warning(f"⚠️ [GUID] 덮어쓰기 모드 활성화됨 - 중복 파일을 덮어씁니다.") total_target_count = len(files) verified_count = 0 moved_count = 0 errors = [] lost_count = 0 try: for file in files: target = dst / file.name # 1. 이미 완료되었는지 확인 if target.exists(): if not overwrite: verified_count += 1 continue # overwrite=True면 계속 진행하여 덮어쓰기 시도 # 2. 소스 확인 if not file.exists(): if target.exists(): verified_count += 1 else: lost_count += 1 continue # 3. 이동 try: shutil.move(str(file), str(target)) moved_count += 1 verified_count += 1 except FileNotFoundError: if target.exists(): verified_count += 1 else: lost_count += 1 except Exception as e: errors.append(f"{file.name}: {e}") # 상세 메시지 msg = f"총 {total_target_count}건 중 {verified_count}건 처리 완료" logging.info(f"✅ GUID 처리: {msg}") flash(msg, "success" if lost_count == 0 else "warning") return jsonify({ "success": True, "total": total_target_count, "verified": verified_count, "message": msg, "errors": errors if errors else None }) except Exception as e: logging.error(f"❌ GUID 이동 중 오류: {e}") return jsonify({"success": False, "error": str(e)}) @utils_bp.route("/move_gpu_files", methods=["POST"]) @login_required def move_gpu_files(): """ data/idrac_info → data/gpu_serial 로 GPU 시리얼 텍스트 파일 이동 """ src = Path(Config.IDRAC_INFO_FOLDER) # 예: data/idrac_info dst = Path(Config.GPU_FOLDER) # 예: data/gpu_serial dst.mkdir(parents=True, exist_ok=True) moved = 0 skipped = 0 missing = 0 errors = [] # JSON 요청 파싱 data = request.get_json(silent=True) or {} overwrite = data.get("overwrite", False) try: all_files = [f for f in src.iterdir() if f.is_file()] files = [f for f in all_files if f.name.lower().endswith(".txt")] except Exception: files = [] # [중복 체크] if not overwrite: duplicates = [] for file in files: target = dst / file.name if target.exists(): duplicates.append(file.name) if duplicates: return jsonify({ "success": False, "requires_confirmation": True, "duplicates": duplicates, "duplicate_count": len(duplicates) }) else: logging.warning(f"⚠️ [GPU] 덮어쓰기 모드 활성화됨 - 중복 파일을 덮어씁니다.") total_target_count = len(files) verified_count = 0 moved_count = 0 errors = [] lost_count = 0 try: for file in files: target = dst / file.name # 1. 존재 확인 (덮어쓰기 아닐 경우) if target.exists(): if not overwrite: verified_count += 1 continue # 2. 소스 확인 if not file.exists(): if target.exists(): verified_count += 1 else: lost_count += 1 continue # 3. 이동 try: shutil.move(str(file), str(target)) moved_count += 1 verified_count += 1 except FileNotFoundError: if target.exists(): verified_count += 1 else: lost_count += 1 except Exception as e: errors.append(f"{file.name}: {e}") # 상세 메시지 msg = f"총 {total_target_count}건 중 {verified_count}건 처리 완료" logging.info(f"✅ GPU 처리: {msg}") flash(msg, "success") return jsonify({ "success": True, "total": total_target_count, "verified": verified_count, "message": msg, "errors": errors if errors else None }) except Exception as e: logging.error(f"❌ GPU 이동 오류: {e}") return jsonify({"success": False, "error": str(e)}) @utils_bp.route("/update_server_list", methods=["POST"]) @login_required def update_server_list(): content = request.form.get("server_list_content") if not content: flash("내용을 입력하세요.", "warning") return redirect(url_for("main.index")) path = Path(Config.SERVER_LIST_FOLDER) / "server_list.txt" try: path.write_text(content, encoding="utf-8") result = subprocess.run( [sys.executable, str(Path(Config.SERVER_LIST_FOLDER) / "excel.py")], capture_output=True, text=True, check=True, cwd=str(Path(Config.SERVER_LIST_FOLDER)), timeout=300, ) logging.info(f"서버 리스트 스크립트 실행 결과: {result.stdout}") flash("서버 리스트가 업데이트되었습니다.", "success") except subprocess.CalledProcessError as e: logging.error(f"서버 리스트 스크립트 오류: {e.stderr}") flash(f"스크립트 실행 실패: {e.stderr}", "danger") except Exception as e: logging.error(f"서버 리스트 처리 오류: {e}") flash(f"서버 리스트 처리 중 오류 발생: {e}", "danger") return redirect(url_for("main.index")) @utils_bp.route("/update_guid_list", methods=["POST"]) @login_required def update_guid_list(): content = request.form.get("server_list_content") slot_priority = request.form.get("slot_priority", "") # 슬롯 우선순위 받기 if not content: flash("내용을 입력하세요.", "warning") return redirect(url_for("main.index")) path = Path(Config.SERVER_LIST_FOLDER) / "guid_list.txt" try: path.write_text(content, encoding="utf-8") # 슬롯 우선순위를 환경변수로 전달 env = os.environ.copy() if slot_priority: env["GUID_SLOT_PRIORITY"] = slot_priority logging.info(f"GUID 슬롯 우선순위: {slot_priority}") result = subprocess.run( [sys.executable, str(Path(Config.SERVER_LIST_FOLDER) / "GUIDtxtT0Execl.py")], capture_output=True, text=True, check=True, cwd=str(Path(Config.SERVER_LIST_FOLDER)), timeout=300, env=env, # 환경변수 전달 ) logging.info(f"GUID 리스트 스크립트 실행 결과: {result.stdout}") flash("GUID 리스트가 업데이트되었습니다.", "success") except subprocess.CalledProcessError as e: logging.error(f"GUID 리스트 스크립트 오류: {e.stderr}") flash(f"스크립트 실행 실패: {e.stderr}", "danger") except Exception as e: logging.error(f"GUID 리스트 처리 오류: {e}") flash(f"GUID 리스트 처리 중 오류 발생: {e}", "danger") return redirect(url_for("main.index")) @utils_bp.route("/update_gpu_list", methods=["POST"]) @login_required def update_gpu_list(): """ GPU 시리얼용 리스트(gpu_serial_list.txt)를 갱신하고 Excel을 생성합니다. - form name="gpu_list_content" 로 내용 전달 (S/T 목록 라인별) - txt_to_excel.py --preset gpu --list-file """ content = request.form.get("server_list_content") if not content: flash("내용을 입력하세요.", "warning") return redirect(url_for("main.index")) server_list_dir = Path(Config.SERVER_LIST_FOLDER) list_path = server_list_dir / "gpu_list.txt" # txt_to_excel.py는 server_list 폴더에 둔다고 가정 (위치 다르면 경로만 수정) script_path = server_list_dir / "GPUTOExecl.py" try: # 1) gpu_serial_list.txt 저장 list_path.write_text(content, encoding="utf-8") # 2) 엑셀 생성 실행 (GPU 프리셋) cmd = [ sys.executable, str(script_path), "--preset", "gpu", "--list-file", str(list_path), ] result = subprocess.run( cmd, capture_output=True, text=True, check=True, cwd=str(server_list_dir), # data/server_list 기준 실행 timeout=300, ) logging.info(f"[GPU] 리스트 스크립트 실행 STDOUT:\n{result.stdout}") if result.stderr: logging.warning(f"[GPU] 리스트 스크립트 STDERR:\n{result.stderr}") flash("GPU 리스트가 업데이트되었습니다.", "success") except subprocess.CalledProcessError as e: logging.error(f"[GPU] 스크립트 오류: {e.stderr}") flash(f"스크립트 실행 실패: {e.stderr}", "danger") except Exception as e: logging.error(f"[GPU] 처리 오류: {e}") flash(f"GPU 리스트 처리 중 오류 발생: {e}", "danger") return redirect(url_for("main.index")) logging.info(f"엑셀 파일 다운로드: {path}") return send_file(str(path), as_attachment=True, download_name="mac_info.xlsx") @utils_bp.route("/scan_network", methods=["POST"]) @login_required def scan_network(): """ 지정된 IP 범위(Start ~ End)에 대해 Ping 테스트를 수행하고 응답이 있는 IP 목록을 반환합니다. """ try: import ipaddress import platform import concurrent.futures data = request.get_json(force=True, silent=True) or {} start_ip_str = data.get('start_ip') end_ip_str = data.get('end_ip') if not start_ip_str or not end_ip_str: return jsonify({"success": False, "error": "시작 IP와 종료 IP를 모두 입력해주세요."}), 400 try: start_ip = ipaddress.IPv4Address(start_ip_str) end_ip = ipaddress.IPv4Address(end_ip_str) if start_ip > end_ip: return jsonify({"success": False, "error": "시작 IP가 종료 IP보다 큽니다."}), 400 # IP 개수 제한 (너무 많은 스캔 방지, 예: C클래스 2개 분량 512개) if int(end_ip) - int(start_ip) > 512: return jsonify({"success": False, "error": "스캔 범위가 너무 넓습니다. (최대 512개)"}), 400 except ValueError: return jsonify({"success": False, "error": "유효하지 않은 IP 주소 형식입니다."}), 400 # Ping 함수 정의 def ping_ip(ip_obj): ip = str(ip_obj) param = '-n' if platform.system().lower() == 'windows' else '-c' timeout_param = '-w' if platform.system().lower() == 'windows' else '-W' # Windows: -w 200 (ms), Linux: -W 1 (s) timeout_val = '200' if platform.system().lower() == 'windows' else '1' command = ['ping', param, '1', timeout_param, timeout_val, ip] try: # shell=False로 보안 강화, stdout/stderr 무시 res = subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return ip if res.returncode == 0 else None except Exception: return None active_ips = [] # IP 리스트 생성 target_ips = [] temp_ip = start_ip while temp_ip <= end_ip: target_ips.append(temp_ip) temp_ip += 1 # 병렬 처리 (최대 50 쓰레드) with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor: results = executor.map(ping_ip, target_ips) # 결과 수집 (None 제외) active_ips = [ip for ip in results if ip is not None] return jsonify({ "success": True, "active_ips": active_ips, "count": len(active_ips), "message": f"스캔 완료: {len(active_ips)}개의 활성 IP 발견" }) except Exception as e: logging.error(f"Scan network fatal error: {e}") return jsonify({"success": False, "error": f"서버 내부 오류: {str(e)}"}), 500