Update 2025-12-19 16:23:03
This commit is contained in:
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -237,3 +237,55 @@ def test_bot(bot_id):
|
||||
flash(f"테스트 실패: {e}", "danger")
|
||||
|
||||
return redirect(url_for("admin.settings"))
|
||||
|
||||
|
||||
# ▼▼▼ 시스템 로그 뷰어 ▼▼▼
|
||||
@admin_bp.route("/admin/logs", methods=["GET"])
|
||||
@login_required
|
||||
@admin_required
|
||||
def view_logs():
|
||||
import os
|
||||
import re
|
||||
from collections import deque
|
||||
|
||||
log_folder = current_app.config.get('LOG_FOLDER')
|
||||
log_file = os.path.join(log_folder, 'app.log') if log_folder else None
|
||||
|
||||
# 1. 실제 ANSI 이스케이프 코드 (\x1B로 시작)
|
||||
ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]')
|
||||
|
||||
# 2. 텍스트로 찍힌 ANSI 코드 패턴 (예: [36m, [0m 등) - Werkzeug가 이스케이프 된 상태로 로그에 남길 경우 대비
|
||||
literal_ansi = re.compile(r'\[[0-9;]+m')
|
||||
|
||||
# 3. 제어 문자 제거
|
||||
control_char_re = re.compile(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]')
|
||||
|
||||
logs = []
|
||||
if log_file and os.path.exists(log_file):
|
||||
try:
|
||||
with open(log_file, 'r', encoding='utf-8', errors='replace') as f:
|
||||
raw_lines = deque(f, 1000)
|
||||
|
||||
for line in raw_lines:
|
||||
# A. 실제 ANSI 코드 제거
|
||||
clean_line = ansi_escape.sub('', line)
|
||||
|
||||
# B. 리터럴 ANSI 패턴 제거 (사용자가 [36m 등을 텍스트로 보고 있다면 이것이 원인)
|
||||
clean_line = literal_ansi.sub('', clean_line)
|
||||
|
||||
# C. 제어 문자 제거
|
||||
clean_line = control_char_re.sub('', clean_line)
|
||||
|
||||
# D. 앞뒤 공백 제거
|
||||
clean_line = clean_line.strip()
|
||||
|
||||
# E. 빈 줄 제외
|
||||
if clean_line:
|
||||
logs.append(clean_line)
|
||||
|
||||
except Exception as e:
|
||||
logs = [f"Error reading log file: {str(e)}"]
|
||||
else:
|
||||
logs = [f"Log file not found at: {log_file}"]
|
||||
|
||||
return render_template("admin_logs.html", logs=logs)
|
||||
|
||||
@@ -293,6 +293,13 @@ def register():
|
||||
return redirect(url_for("auth.login"))
|
||||
else:
|
||||
if request.method == "POST":
|
||||
# 폼 검증 실패 에러를 Flash 메시지로 출력
|
||||
for field_name, errors in form.errors.items():
|
||||
for error in errors:
|
||||
# 필드 객체 가져오기 (라벨 텍스트 확인용)
|
||||
field = getattr(form, field_name, None)
|
||||
label = field.label.text if field else field_name
|
||||
flash(f"{label}: {error}", "warning")
|
||||
current_app.logger.info("REGISTER: form errors=%s", form.errors)
|
||||
|
||||
return render_template("register.html", form=form)
|
||||
|
||||
@@ -47,11 +47,43 @@ def index():
|
||||
info_dir = Path(Config.IDRAC_INFO_FOLDER)
|
||||
backup_dir = Path(Config.BACKUP_FOLDER)
|
||||
|
||||
scripts = [f.name for f in script_dir.glob("*") if f.is_file() and f.name != ".env"]
|
||||
scripts = natsorted(scripts)
|
||||
# 1. 스크립트 목록 조회 및 카테고리 분류
|
||||
all_scripts = [f.name for f in script_dir.glob("*") if f.is_file() and f.name != ".env"]
|
||||
all_scripts = natsorted(all_scripts)
|
||||
|
||||
grouped_scripts = {}
|
||||
for script in all_scripts:
|
||||
upper = script.upper()
|
||||
category = "General"
|
||||
if upper.startswith("GPU"):
|
||||
category = "GPU"
|
||||
elif upper.startswith("LOM"):
|
||||
category = "LOM"
|
||||
elif upper.startswith("TYPE") or upper.startswith("XE"):
|
||||
category = "Server Models"
|
||||
elif "MAC" in upper:
|
||||
category = "MAC Info"
|
||||
elif "GUID" in upper:
|
||||
category = "GUID Info"
|
||||
elif "SET_" in upper or "CONFIG" in upper:
|
||||
category = "Configuration"
|
||||
|
||||
if category not in grouped_scripts:
|
||||
grouped_scripts[category] = []
|
||||
grouped_scripts[category].append(script)
|
||||
|
||||
# 카테고리 정렬 (General은 마지막에)
|
||||
sorted_categories = sorted(grouped_scripts.keys())
|
||||
if "General" in sorted_categories:
|
||||
sorted_categories.remove("General")
|
||||
sorted_categories.append("General")
|
||||
|
||||
grouped_scripts_sorted = {k: grouped_scripts[k] for k in sorted_categories}
|
||||
|
||||
# 2. XML 파일 목록
|
||||
xml_files = [f.name for f in xml_dir.glob("*.xml")]
|
||||
|
||||
# 페이지네이션
|
||||
# 3. 페이지네이션 및 파일 목록
|
||||
page = int(request.args.get("page", 1))
|
||||
info_files = [f.name for f in info_dir.glob("*") if f.is_file()]
|
||||
info_files = natsorted(info_files)
|
||||
@@ -62,11 +94,10 @@ def index():
|
||||
|
||||
total_pages = (len(info_files) + Config.FILES_PER_PAGE - 1) // Config.FILES_PER_PAGE
|
||||
|
||||
# ✅ 추가: 10개 단위로 표시될 페이지 범위 계산
|
||||
start_page = ((page - 1) // 10) * 10 + 1
|
||||
end_page = min(start_page + 9, total_pages)
|
||||
|
||||
# 백업 폴더 목록 (디렉터리만)
|
||||
# 4. 백업 폴더 목록
|
||||
backup_dirs = [d for d in backup_dir.iterdir() if d.is_dir()]
|
||||
backup_dirs.sort(key=lambda p: p.stat().st_mtime, reverse=True)
|
||||
|
||||
@@ -91,7 +122,8 @@ def index():
|
||||
backup_files=backup_files,
|
||||
total_backup_pages=total_backup_pages,
|
||||
backup_page=backup_page,
|
||||
scripts=scripts,
|
||||
scripts=all_scripts, # 기존 리스트 호환
|
||||
grouped_scripts=grouped_scripts_sorted, # 카테고리별 분류
|
||||
xml_files=xml_files,
|
||||
)
|
||||
|
||||
|
||||
@@ -32,26 +32,48 @@ def diff_scp():
|
||||
return redirect(url_for("xml.xml_management"))
|
||||
|
||||
# 파일 내용 읽기 (LF로 통일)
|
||||
content1 = file1_path.read_text(encoding="utf-8").replace("\r\n", "\n").splitlines()
|
||||
content2 = file2_path.read_text(encoding="utf-8").replace("\r\n", "\n").splitlines()
|
||||
|
||||
# Diff 생성
|
||||
diff = difflib.unified_diff(
|
||||
content1, content2,
|
||||
fromfile=file1_name,
|
||||
tofile=file2_name,
|
||||
lineterm=""
|
||||
)
|
||||
# 파일 내용 읽기 (LF로 통일)
|
||||
# Monaco Editor에 원본 텍스트를 그대로 전달하기 위해 splitlines() 제거
|
||||
# 파일 내용 읽기 (LF로 통일)
|
||||
logger.info(f"Reading file1: {file1_path}")
|
||||
content1 = file1_path.read_text(encoding="utf-8", errors="replace").replace("\r\n", "\n")
|
||||
|
||||
diff_content = "\n".join(diff)
|
||||
logger.info(f"Reading file2: {file2_path}")
|
||||
content2 = file2_path.read_text(encoding="utf-8", errors="replace").replace("\r\n", "\n")
|
||||
|
||||
return render_template("scp_diff.html", file1=file1_name, file2=file2_name, diff_content=diff_content)
|
||||
logger.info(f"Content1 length: {len(content1)}, Content2 length: {len(content2)}")
|
||||
|
||||
return render_template("scp_diff.html",
|
||||
file1=file1_name,
|
||||
file2=file2_name,
|
||||
content1=content1,
|
||||
content2=content2)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Diff error: {e}")
|
||||
flash(f"비교 중 오류가 발생했습니다: {str(e)}", "danger")
|
||||
return redirect(url_for("xml.xml_management"))
|
||||
|
||||
@scp_bp.route("/scp/content/<path:filename>")
|
||||
@login_required
|
||||
def get_scp_content(filename):
|
||||
"""
|
||||
XML 파일 내용을 반환하는 API (Monaco Editor용)
|
||||
"""
|
||||
try:
|
||||
safe_name = sanitize_preserve_unicode(filename)
|
||||
path = Path(Config.XML_FOLDER) / safe_name
|
||||
|
||||
if not path.exists():
|
||||
return "File not found", 404
|
||||
|
||||
# 텍스트로 읽어서 반환
|
||||
content = path.read_text(encoding="utf-8", errors="replace").replace("\r\n", "\n")
|
||||
return content, 200, {'Content-Type': 'text/plain; charset=utf-8'}
|
||||
except Exception as e:
|
||||
logger.error(f"Content read error: {e}")
|
||||
return str(e), 500
|
||||
|
||||
@scp_bp.route("/scp/export", methods=["POST"])
|
||||
@login_required
|
||||
def export_scp():
|
||||
|
||||
@@ -290,13 +290,79 @@ def update_gpu_list():
|
||||
|
||||
return redirect(url_for("main.index"))
|
||||
|
||||
@utils_bp.route("/download_excel")
|
||||
@login_required
|
||||
def download_excel():
|
||||
path = Path(Config.SERVER_LIST_FOLDER) / "mac_info.xlsx"
|
||||
if not path.is_file():
|
||||
flash("엑셀 파일을 찾을 수 없습니다.", "danger")
|
||||
return redirect(url_for("main.index"))
|
||||
|
||||
logging.info(f"엑셀 파일 다운로드: {path}")
|
||||
return send_file(str(path), as_attachment=True, download_name="mac_info.xlsx")
|
||||
return send_file(str(path), as_attachment=True, download_name="mac_info.xlsx")
|
||||
|
||||
|
||||
@utils_bp.route("/scan_network", methods=["POST"])
|
||||
@login_required
|
||||
def scan_network():
|
||||
"""
|
||||
지정된 IP 범위(Start ~ End)에 대해 Ping 테스트를 수행하고
|
||||
응답이 있는 IP 목록을 반환합니다.
|
||||
"""
|
||||
import ipaddress
|
||||
import platform
|
||||
import concurrent.futures
|
||||
|
||||
data = request.get_json()
|
||||
start_ip_str = data.get('start_ip')
|
||||
end_ip_str = data.get('end_ip')
|
||||
|
||||
if not start_ip_str or not end_ip_str:
|
||||
return jsonify({"success": False, "error": "시작 IP와 종료 IP를 모두 입력해주세요."}), 400
|
||||
|
||||
try:
|
||||
start_ip = ipaddress.IPv4Address(start_ip_str)
|
||||
end_ip = ipaddress.IPv4Address(end_ip_str)
|
||||
|
||||
if start_ip > end_ip:
|
||||
return jsonify({"success": False, "error": "시작 IP가 종료 IP보다 큽니다."}), 400
|
||||
|
||||
# IP 개수 제한 (너무 많은 스캔 방지, 예: C클래스 2개 분량 512개)
|
||||
if int(end_ip) - int(start_ip) > 512:
|
||||
return jsonify({"success": False, "error": "스캔 범위가 너무 넓습니다. (최대 512개)"}), 400
|
||||
|
||||
except ValueError:
|
||||
return jsonify({"success": False, "error": "유효하지 않은 IP 주소 형식입니다."}), 400
|
||||
|
||||
# Ping 함수 정의
|
||||
def ping_ip(ip_obj):
|
||||
ip = str(ip_obj)
|
||||
param = '-n' if platform.system().lower() == 'windows' else '-c'
|
||||
timeout_param = '-w' if platform.system().lower() == 'windows' else '-W'
|
||||
# Windows: -w 200 (ms), Linux: -W 1 (s)
|
||||
timeout_val = '200' if platform.system().lower() == 'windows' else '1'
|
||||
|
||||
command = ['ping', param, '1', timeout_param, timeout_val, ip]
|
||||
|
||||
try:
|
||||
# shell=False로 보안 강화, stdout/stderr 무시
|
||||
res = subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||||
return ip if res.returncode == 0 else None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
active_ips = []
|
||||
|
||||
# IP 리스트 생성
|
||||
# ipaddress 모듈을 사용하여 범위 내 IP 생성
|
||||
target_ips = []
|
||||
temp_ip = start_ip
|
||||
while temp_ip <= end_ip:
|
||||
target_ips.append(temp_ip)
|
||||
temp_ip += 1
|
||||
|
||||
# 병렬 처리 (최대 50 쓰레드)
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
|
||||
results = executor.map(ping_ip, target_ips)
|
||||
|
||||
# 결과 수집 (None 제외)
|
||||
active_ips = [ip for ip in results if ip is not None]
|
||||
|
||||
return jsonify({
|
||||
"success": True,
|
||||
"active_ips": active_ips,
|
||||
"count": len(active_ips),
|
||||
"message": f"스캔 완료: {len(active_ips)}개의 활성 IP 발견"
|
||||
})
|
||||
Reference in New Issue
Block a user