""" Dell iDRAC Redfish API Client (수정 버전) 절대 경로와 상대 경로 모두 처리 """ import requests import urllib3 from typing import Dict, Any, Optional, List import logging from functools import wraps import time import os # SSL 경고 비활성화 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) logger = logging.getLogger(__name__) def retry_on_failure(max_attempts: int = 2, delay: float = 2.0): """재시도 데코레이터""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_attempts): try: return func(*args, **kwargs) except (requests.Timeout, requests.ConnectionError) as e: last_exception = e if attempt < max_attempts - 1: logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}") time.sleep(delay * (attempt + 1)) except Exception as e: raise raise last_exception return wrapper return decorator class RedfishClient: """Dell iDRAC Redfish API 클라이언트""" def __init__( self, ip: str, username: str, password: str, timeout: int = 15, verify_ssl: bool = False ): self.ip = ip self.base_url = f"https://{ip}/redfish/v1" self.host_url = f"https://{ip}" # ← 추가: 호스트 URL self.timeout = timeout self.verify_ssl = verify_ssl self.session = requests.Session() self.session.auth = (username, password) self.session.verify = verify_ssl self.session.headers.update({ "Content-Type": "application/json", "Accept": "application/json" }) @retry_on_failure(max_attempts=2, delay=2.0) def get(self, endpoint: str) -> Dict[str, Any]: """ GET 요청 절대 경로와 상대 경로 모두 처리 """ # 절대 경로 처리 (이미 /redfish/v1로 시작하는 경우) if endpoint.startswith('/redfish/v1'): url = f"{self.host_url}{endpoint}" # 상대 경로 처리 else: url = f"{self.base_url}{endpoint}" logger.debug(f"GET {url}") response = self.session.get(url, timeout=self.timeout) response.raise_for_status() return response.json() def get_jobs(self) -> List[Dict[str, Any]]: """ 모든 Job 조회 표준 경로와 Dell OEM 경로 모두 시도 """ jobs = [] # 1. 표준 Redfish Jobs 경로 시도 try: standard_jobs = self._get_jobs_standard() jobs.extend(standard_jobs) except Exception as e: logger.warning(f"{self.ip}: Standard Jobs endpoint failed: {e}") # 2. Dell OEM Jobs 경로 시도 try: oem_jobs = self._get_jobs_dell_oem() jobs.extend(oem_jobs) except Exception as e: logger.warning(f"{self.ip}: Dell OEM Jobs endpoint failed: {e}") if not jobs: logger.info(f"{self.ip}: No jobs found") return [] # 중복 제거 (JID 기준) seen_jids = set() unique_jobs = [] for job in jobs: jid = job.get("JID", "") if jid and jid not in seen_jids: seen_jids.add(jid) unique_jobs.append(job) logger.info(f"{self.ip}: Retrieved {len(unique_jobs)} unique jobs") return sorted(unique_jobs, key=lambda x: x.get("JID", "")) def _get_jobs_standard(self) -> List[Dict[str, Any]]: """표준 Redfish Jobs 조회""" jobs_endpoint = "/Managers/iDRAC.Embedded.1/Jobs" jobs_collection = self.get(jobs_endpoint) members = jobs_collection.get("Members", []) if not members: return [] jobs = [] for member in members: job_path = member.get("@odata.id", "") if not job_path: continue try: job_data = self.get(job_path) normalized_job = self._normalize_job(job_data) jobs.append(normalized_job) except Exception as e: logger.warning(f"{self.ip}: Failed to get job {job_path}: {e}") continue return jobs def _get_jobs_dell_oem(self) -> List[Dict[str, Any]]: """Dell OEM Jobs 조회""" oem_endpoint = "/Managers/iDRAC.Embedded.1/Oem/Dell/Jobs" try: jobs_collection = self.get(oem_endpoint) except requests.HTTPError as e: if e.response.status_code == 404: logger.debug(f"{self.ip}: Dell OEM endpoint not available") return [] raise members = jobs_collection.get("Members", []) if not members: return [] jobs = [] for member in members: job_path = member.get("@odata.id", "") if not job_path: continue try: job_data = self.get(job_path) normalized_job = self._normalize_job(job_data) jobs.append(normalized_job) except Exception as e: logger.warning(f"{self.ip}: Failed to get Dell OEM job {job_path}: {e}") continue return jobs def _normalize_job(self, job_data: Dict[str, Any]) -> Dict[str, Any]: """Redfish Job 데이터를 표준 포맷으로 변환""" percent = job_data.get("PercentComplete", 0) if percent is None: percent = 0 # JobState 매핑 job_state = job_data.get("JobState", "Unknown") status_map = { "New": "Scheduled", "Starting": "Starting", "Running": "Running", "Completed": "Completed", "Failed": "Failed", "CompletedWithErrors": "Completed with Errors", "Pending": "Pending", "Paused": "Paused", "Stopping": "Stopping", "Cancelled": "Cancelled", "Cancelling": "Cancelling" } status = status_map.get(job_state, job_state) # 메시지 처리 messages = job_data.get("Messages", []) message_text = "" if messages and isinstance(messages, list): if messages[0] and isinstance(messages[0], dict): message_text = messages[0].get("Message", "") if not message_text: message_text = job_data.get("Message", "") return { "JID": job_data.get("Id", ""), "Name": job_data.get("Name", ""), "Status": status, "PercentComplete": str(percent), "Message": message_text, "ScheduledStartTime": job_data.get("ScheduledStartTime", ""), "StartTime": job_data.get("StartTime", ""), "EndTime": job_data.get("EndTime", ""), "LastUpdateTime": job_data.get("EndTime") or job_data.get("StartTime", ""), } def close(self): """세션 종료""" self.session.close() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() # 커스텀 예외 class AuthenticationError(Exception): """인증 실패""" pass class NotSupportedError(Exception): """지원하지 않는 기능""" pass