This commit is contained in:
2025-10-13 22:26:27 +09:00
parent a79e61d7e4
commit 2fcca115d6
4 changed files with 335 additions and 3 deletions

View File

@@ -0,0 +1,159 @@
import os
import re
import subprocess
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from dotenv import load_dotenv, find_dotenv
# ─────────────────────────────────────────────────────────────
# .env 자동 탐색 로드 (현재 파일 기준 상위 디렉터리까지 검색)
load_dotenv(find_dotenv())
IDRAC_USER = os.getenv("IDRAC_USER")
IDRAC_PASS = os.getenv("IDRAC_PASS")
def resolve_output_dir() -> Path:
"""
실행 위치와 무관하게 결과를 data/idrac_info 밑으로 저장.
- 스크립트가 data/scripts/ 에 있다면 → data/idrac_info
- 그 외 위치라도 → (스크립트 상위 폴더)/idrac_info
"""
here = Path(__file__).resolve().parent # .../data/scripts 또는 다른 폴더
# case 1: .../data/scripts → data/idrac_info
if here.name.lower() == "scripts" and here.parent.name.lower() == "data":
base = here.parent # data
# case 2: .../scripts (상위가 data가 아닐 때도 상위 폴더를 base로 사용)
elif here.name.lower() == "scripts":
base = here.parent
# case 3: 일반적인 경우: 현재 파일의 상위 폴더
else:
base = here.parent
out = base / "idrac_info"
out.mkdir(parents=True, exist_ok=True)
return out
def parse_gpu_serials_from_hwinventory(hwinv_text: str) -> dict:
"""
iDRAC hwinventory 전체 텍스트에서 GPU(Video.Slot.*) 블록을 찾아
{FQDD(or InstanceID): SerialNumber} 딕셔너리로 반환.
블록은 빈 줄로 구분되어 있다고 가정.
"""
results = {}
# 빈 줄 기준 블록 분할(여러 개의 개행을 하나의 경계로)
blocks = re.split(r"\n\s*\n", hwinv_text, flags=re.MULTILINE)
for block in blocks:
# GPU(Video) 블록만 처리
if not re.search(r"\[?InstanceID:\s*Video\.Slot\.", block):
continue
# FQDD 우선, 없으면 [InstanceID: ...]에서 추출
fqdd = None
m_fqdd = re.search(r"^FQDD\s*=\s*([^\r\n]+)", block, flags=re.MULTILINE)
if m_fqdd:
fqdd = m_fqdd.group(1).strip()
else:
m_hdr = re.search(r"\[InstanceID:\s*(Video\.Slot\.[^\]\r\n]+)\]", block)
if m_hdr:
fqdd = m_hdr.group(1).strip()
if not fqdd:
# 안전장치: Description에 FQDD가 암시되는 경우가 있으나 보통 필요 없음
continue
# SerialNumber 추출
m_sn = re.search(r"^SerialNumber\s*=\s*([^\r\n]+)", block, flags=re.MULTILINE)
serial = m_sn.group(1).strip() if m_sn else "Not Found"
results[fqdd] = serial
return results
def fetch_idrac_info(idrac_ip: str, output_dir: Path) -> None:
try:
# 서비스 태그 가져오기 (get 제외)
cmd_getsysinfo = [
"racadm", "-r", idrac_ip, "-u", IDRAC_USER or "", "-p", IDRAC_PASS or "", "getsysinfo"
]
getsysinfo = subprocess.getoutput(" ".join(cmd_getsysinfo))
svc_tag_match = re.search(r"SVC Tag\s*=\s*(\S+)", getsysinfo)
svc_tag = svc_tag_match.group(1) if svc_tag_match else None
if not svc_tag:
print(f"Failed to retrieve SVC Tag for IP: {idrac_ip}")
return
# 전체 하드웨어 인벤토리
cmd_hwinv = [
"racadm", "-r", idrac_ip, "-u", IDRAC_USER or "", "-p", IDRAC_PASS or "", "hwinventory"
]
hwinv_text = subprocess.getoutput(" ".join(cmd_hwinv))
gpu_map = parse_gpu_serials_from_hwinventory(hwinv_text)
# 결과 저장 파일 (SVC Tag 기준)
output_file = output_dir / f"{svc_tag}.txt"
with output_file.open("w", encoding="utf-8", newline="\n") as f:
# 헤더: 서비스 태그
f.write(f"{svc_tag}\n")
if not gpu_map:
f.write("No GPU(Video) inventory found or SerialNumber not present.\n")
return
# 정렬: Video.Slot.<num>-<idx> 의 <num>, <idx> 기준
def slot_key(k: str):
m = re.search(r"Video\.Slot\.(\d+)-(\d+)", k)
if m:
return (int(m.group(1)), int(m.group(2)))
# 예외: 정규 형태가 아니면 뒤로
return (1_000_000, 1_000_000, k)
for fqdd in sorted(gpu_map.keys(), key=slot_key):
serial = gpu_map[fqdd]
f.write(f"{fqdd}: {serial}\n")
# 추가 요약: 시리얼만 세미콜론으로 모아서 한 줄로
serials_only = [gpu_map[k] for k in sorted(gpu_map.keys(), key=slot_key) if gpu_map[k] != "Not Found"]
if serials_only:
f.write(f"SERIALS: {';'.join(serials_only)}\n")
except Exception as e:
print(f"Error processing IP {idrac_ip}: {e}")
def main(ip_file: str) -> None:
ip_path = Path(ip_file)
if not ip_path.is_file():
print(f"IP file {ip_file} does not exist.")
return
output_dir = resolve_output_dir()
with ip_path.open("r", encoding="utf-8") as file:
ip_addresses = [line.strip() for line in file if line.strip()]
# 스레드풀 (동시 100개까지)
with ThreadPoolExecutor(max_workers=100) as executor:
future_to_ip = {executor.submit(fetch_idrac_info, ip, output_dir): ip for ip in ip_addresses}
for future in as_completed(future_to_ip):
ip = future_to_ip[future]
try:
future.result()
print(f"✅ Completed: {ip}")
except Exception as e:
print(f"❌ Error processing {ip}: {e}")
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("Usage: python GPU_Serial_v1.py <ip_file>")
sys.exit(1)
main(sys.argv[1])

View File

@@ -0,0 +1,172 @@
from __future__ import annotations
import os
import argparse
from pathlib import Path
from collections import OrderedDict
import pandas as pd
# ------------------------------------------------------------
# Cross-platform root resolver (Windows / Linux / macOS)
# ------------------------------------------------------------
def resolve_data_root() -> Path:
"""
Priority:
1) Env var IDRAC_DATA_DIR (absolute/relative OK)
2) nearest parent of this file that contains a 'data' folder
3) ./data under current working directory
"""
env = os.getenv("IDRAC_DATA_DIR")
if env:
return Path(env).expanduser().resolve()
here = Path(__file__).resolve()
for p in [here] + list(here.parents):
if (p / "data").is_dir():
return (p / "data").resolve()
return (Path.cwd() / "data").resolve()
DATA_ROOT = resolve_data_root()
# ------------------------------------------------------------
# Utilities
# ------------------------------------------------------------
def read_lines_any_encoding(path: Path) -> list[str]:
"""Read text file trying common encodings (utf-8/utf-8-sig/cp949/euc-kr/latin-1)."""
encodings = ["utf-8-sig", "utf-8", "cp949", "euc-kr", "latin-1"]
for enc in encodings:
try:
with path.open("r", encoding=enc, errors="strict") as f:
return f.read().splitlines()
except Exception:
continue
# last resort with replacement
with path.open("r", encoding="utf-8", errors="replace") as f:
return f.read().splitlines()
def parse_txt_with_st(file_path: Path) -> dict:
"""
Parse a .txt file:
- First line becomes 'S/T'
- Remaining lines in 'Key: Value' form
Keeps insertion order.
"""
lines = read_lines_any_encoding(file_path)
if not lines:
return {}
data = OrderedDict()
data["S/T"] = lines[0].strip()
for raw in lines[1:]:
line = raw.strip()
if not line or ":" not in line:
continue
key, value = line.split(":", 1)
data[key.strip()] = value.strip()
return dict(data)
def collect_file_list(input_dir: Path, list_file: Path | None) -> list[Path]:
"""
1) list_file가 주어지고 존재하면: 그 목록 순서대로 <name>.txt를 input_dir에서 찾음
2) 없으면: input_dir 안의 *.txt 전체를 파일명 오름차순으로 사용
"""
files: list[Path] = []
if list_file and list_file.is_file():
names = [x.strip() for x in read_lines_any_encoding(list_file) if x.strip()]
for name in names:
p = input_dir / f"{name}.txt"
if p.is_file():
files.append(p)
else:
print(f"[WARN] 파일을 찾을 수 없습니다: {p.name}")
return files
# fallback: 디렉토리 스캔
files = sorted(input_dir.glob("*.txt"))
if not files:
print(f"[WARN] 입력 폴더에 .txt 파일이 없습니다: {input_dir}")
return files
def main():
parser = argparse.ArgumentParser(
description="GUID/GPU 시리얼 텍스트들을 하나의 Excel로 병합"
)
parser.add_argument(
"--preset",
choices=["guid", "gpu"],
default="guid",
help="경로 프리셋 선택 (guid: 기존 GUID 경로, gpu: gpu_serial 폴더)"
)
parser.add_argument(
"--input-dir",
type=Path,
default=None,
help="입력 텍스트 폴더(기본: preset에 따름)"
)
parser.add_argument(
"--list-file",
type=Path,
default=None,
help="처리할 파일명 목록(txt). 없으면 폴더 내 *.txt 전체 처리"
)
parser.add_argument(
"--output-xlsx",
type=Path,
default=None,
help="출력 엑셀 경로(기본: preset에 따름)"
)
args = parser.parse_args()
# ---- Preset 기본값 설정 ----
if args.preset == "guid":
default_input_dir = Path(os.getenv("GUID_TXT_DIR", DATA_ROOT / "guid_file"))
default_list_file = Path(os.getenv("GUID_LIST_FILE", DATA_ROOT / "server_list" / "guid_list.txt"))
default_output = Path(os.getenv("GUID_OUTPUT_XLSX", DATA_ROOT / "idrac_info" / "XE9680_GUID.xlsx"))
else: # gpu
default_input_dir = Path(os.getenv("GPU_TXT_DIR", DATA_ROOT / "gpu_serial"))
default_list_file = Path(os.getenv("GPU_LIST_FILE", DATA_ROOT / "server_list" / "gpu_serial_list.txt"))
default_output = Path(os.getenv("GPU_OUTPUT_XLSX", DATA_ROOT / "idrac_info" / "GPU_SERIALS.xlsx"))
input_dir: Path = args.input_dir or default_input_dir
list_file: Path | None = args.list_file or (default_list_file if default_list_file.is_file() else None)
output_xlsx: Path = args.output_xlsx or default_output
# 출력 폴더 보장
output_xlsx.parent.mkdir(parents=True, exist_ok=True)
if not input_dir.is_dir():
raise FileNotFoundError(f"입력 폴더가 없습니다: {input_dir}")
# 파일 목록 수집
txt_files = collect_file_list(input_dir, list_file)
# 데이터 누적
rows: list[dict] = []
for txt_path in txt_files:
rows.append(parse_txt_with_st(txt_path))
if not rows:
print("[INFO] 병합할 데이터가 없습니다.")
return
# DataFrame (모든 키의 합집합 컬럼 생성)
df = pd.DataFrame(rows)
# No 열 선두 삽입
df.insert(0, "No", range(1, len(df) + 1))
# 저장
df.to_excel(output_xlsx, index=False)
print(f"엑셀 파일이 생성되었습니다: {output_xlsx}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,3 @@
8XZCZC4
910DZC4
CXZCZC4

View File

@@ -1,3 +1 @@
1XZCZC4
2NYCZC4
2XZCZC4