import os import re import subprocess from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed import logging from dotenv import load_dotenv, find_dotenv # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [INFO] root: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # ───────────────────────────────────────────────────────────── # .env 자동 탐색 로드 (현재 파일 기준 상위 디렉터리까지 검색) load_dotenv(find_dotenv()) IDRAC_USER = os.getenv("IDRAC_USER") IDRAC_PASS = os.getenv("IDRAC_PASS") def resolve_output_dir() -> Path: """ 실행 위치와 무관하게 결과를 data/idrac_info 밑으로 저장. - 스크립트가 data/scripts/ 에 있다면 → data/idrac_info - 그 외 위치라도 → (스크립트 상위 폴더)/idrac_info """ here = Path(__file__).resolve().parent # .../data/scripts 또는 다른 폴더 # case 1: .../data/scripts → data/idrac_info if here.name.lower() == "scripts" and here.parent.name.lower() == "data": base = here.parent # data # case 2: .../scripts (상위가 data가 아닐 때도 상위 폴더를 base로 사용) elif here.name.lower() == "scripts": base = here.parent # case 3: 일반적인 경우: 현재 파일의 상위 폴더 else: base = here.parent out = base / "idrac_info" out.mkdir(parents=True, exist_ok=True) return out def parse_gpu_serials_from_hwinventory(hwinv_text: str) -> dict: """ iDRAC hwinventory 전체 텍스트에서 GPU(Video.Slot.*) 블록을 찾아 {FQDD(or InstanceID): SerialNumber} 딕셔너리로 반환. 블록은 빈 줄로 구분되어 있다고 가정. """ results = {} # 빈 줄 기준 블록 분할(여러 개의 개행을 하나의 경계로) blocks = re.split(r"\n\s*\n", hwinv_text, flags=re.MULTILINE) for block in blocks: # GPU(Video) 블록만 처리 if not re.search(r"\[?InstanceID:\s*Video\.Slot\.", block): continue # FQDD 우선, 없으면 [InstanceID: ...]에서 추출 fqdd = None m_fqdd = re.search(r"^FQDD\s*=\s*([^\r\n]+)", block, flags=re.MULTILINE) if m_fqdd: fqdd = m_fqdd.group(1).strip() else: m_hdr = re.search(r"\[InstanceID:\s*(Video\.Slot\.[^\]\r\n]+)\]", block) if m_hdr: fqdd = m_hdr.group(1).strip() if not fqdd: # 안전장치: Description에 FQDD가 암시되는 경우가 있으나 보통 필요 없음 continue # SerialNumber 추출 m_sn = re.search(r"^SerialNumber\s*=\s*([^\r\n]+)", block, flags=re.MULTILINE) serial = m_sn.group(1).strip() if m_sn else "Not Found" results[fqdd] = serial return results def fetch_idrac_info(idrac_ip: str, output_dir: Path) -> None: try: # 서비스 태그 가져오기 (get 제외) cmd_getsysinfo = [ "racadm", "-r", idrac_ip, "-u", IDRAC_USER or "", "-p", IDRAC_PASS or "", "getsysinfo" ] getsysinfo = subprocess.getoutput(" ".join(cmd_getsysinfo)) svc_tag_match = re.search(r"SVC Tag\s*=\s*(\S+)", getsysinfo) svc_tag = svc_tag_match.group(1) if svc_tag_match else None if not svc_tag: logging.error(f"Failed to retrieve SVC Tag for IP: {idrac_ip}") return # 전체 하드웨어 인벤토리 cmd_hwinv = [ "racadm", "-r", idrac_ip, "-u", IDRAC_USER or "", "-p", IDRAC_PASS or "", "hwinventory" ] hwinv_text = subprocess.getoutput(" ".join(cmd_hwinv)) gpu_map = parse_gpu_serials_from_hwinventory(hwinv_text) # 결과 저장 파일 (SVC Tag 기준) output_file = output_dir / f"{svc_tag}.txt" with output_file.open("w", encoding="utf-8", newline="\n") as f: # 헤더: 서비스 태그 f.write(f"{svc_tag}\n") if not gpu_map: f.write("No GPU(Video) inventory found or SerialNumber not present.\n") return # 정렬: Video.Slot.-, 기준 def slot_key(k: str): m = re.search(r"Video\.Slot\.(\d+)-(\d+)", k) if m: return (int(m.group(1)), int(m.group(2))) # 예외: 정규 형태가 아니면 뒤로 return (1_000_000, 1_000_000, k) for fqdd in sorted(gpu_map.keys(), key=slot_key): serial = gpu_map[fqdd] f.write(f"{fqdd}: {serial}\n") # 추가 요약: 시리얼만 세미콜론으로 모아서 한 줄로 serials_only = [gpu_map[k] for k in sorted(gpu_map.keys(), key=slot_key) if gpu_map[k] != "Not Found"] if serials_only: f.write(f"SERIALS: {';'.join(serials_only)}\n") except Exception as e: logging.error(f"Error processing IP {idrac_ip}: {e}") def main(ip_file: str) -> None: ip_path = Path(ip_file) if not ip_path.is_file(): logging.error(f"IP file {ip_file} does not exist.") return output_dir = resolve_output_dir() with ip_path.open("r", encoding="utf-8") as file: ip_addresses = [line.strip() for line in file if line.strip()] # 스레드풀 (동시 100개까지) with ThreadPoolExecutor(max_workers=100) as executor: future_to_ip = {executor.submit(fetch_idrac_info, ip, output_dir): ip for ip in ip_addresses} for future in as_completed(future_to_ip): ip = future_to_ip[future] try: future.result() logging.info(f"✅ Completed: {ip}") except Exception as e: logging.error(f"❌ Error processing {ip}: {e}") if __name__ == "__main__": import sys if len(sys.argv) != 2: logging.error("Usage: python GPU_Serial_v1.py ") sys.exit(1) main(sys.argv[1])