Update 2025-12-19 19:18:16

This commit is contained in:
unknown
2025-12-19 19:18:16 +09:00
parent b18412ecb2
commit b37c43ab86
19 changed files with 7629 additions and 89 deletions

View File

@@ -301,68 +301,72 @@ def scan_network():
지정된 IP 범위(Start ~ End)에 대해 Ping 테스트를 수행하고
응답이 있는 IP 목록을 반환합니다.
"""
import ipaddress
import platform
import concurrent.futures
data = request.get_json()
start_ip_str = data.get('start_ip')
end_ip_str = data.get('end_ip')
if not start_ip_str or not end_ip_str:
return jsonify({"success": False, "error": "시작 IP와 종료 IP를 모두 입력해주세요."}), 400
try:
start_ip = ipaddress.IPv4Address(start_ip_str)
end_ip = ipaddress.IPv4Address(end_ip_str)
if start_ip > end_ip:
return jsonify({"success": False, "error": "시작 IP가 종료 IP보다 큽니다."}), 400
# IP 개수 제한 (너무 많은 스캔 방지, 예: C클래스 2개 분량 512개)
if int(end_ip) - int(start_ip) > 512:
return jsonify({"success": False, "error": "스캔 범위가 너무 넓습니다. (최대 512개)"}), 400
import ipaddress
import platform
import concurrent.futures
except ValueError:
return jsonify({"success": False, "error": "유효하지 않은 IP 주소 형식입니다."}), 400
data = request.get_json(force=True, silent=True) or {}
start_ip_str = data.get('start_ip')
end_ip_str = data.get('end_ip')
if not start_ip_str or not end_ip_str:
return jsonify({"success": False, "error": "시작 IP와 종료 IP를 모두 입력해주세요."}), 400
# Ping 함수 정의
def ping_ip(ip_obj):
ip = str(ip_obj)
param = '-n' if platform.system().lower() == 'windows' else '-c'
timeout_param = '-w' if platform.system().lower() == 'windows' else '-W'
# Windows: -w 200 (ms), Linux: -W 1 (s)
timeout_val = '200' if platform.system().lower() == 'windows' else '1'
command = ['ping', param, '1', timeout_param, timeout_val, ip]
try:
# shell=False로 보안 강화, stdout/stderr 무시
res = subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return ip if res.returncode == 0 else None
except Exception:
return None
start_ip = ipaddress.IPv4Address(start_ip_str)
end_ip = ipaddress.IPv4Address(end_ip_str)
if start_ip > end_ip:
return jsonify({"success": False, "error": "시작 IP가 종료 IP보다 큽니다."}), 400
# IP 개수 제한 (너무 많은 스캔 방지, 예: C클래스 2개 분량 512개)
if int(end_ip) - int(start_ip) > 512:
return jsonify({"success": False, "error": "스캔 범위가 너무 넓습니다. (최대 512개)"}), 400
active_ips = []
# IP 리스트 생성
# ipaddress 모듈을 사용하여 범위 내 IP 생성
target_ips = []
temp_ip = start_ip
while temp_ip <= end_ip:
target_ips.append(temp_ip)
temp_ip += 1
except ValueError:
return jsonify({"success": False, "error": "유효하지 않은 IP 주소 형식입니다."}), 400
# 병렬 처리 (최대 50 쓰레드)
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
results = executor.map(ping_ip, target_ips)
# 결과 수집 (None 제외)
active_ips = [ip for ip in results if ip is not None]
# Ping 함수 정의
def ping_ip(ip_obj):
ip = str(ip_obj)
param = '-n' if platform.system().lower() == 'windows' else '-c'
timeout_param = '-w' if platform.system().lower() == 'windows' else '-W'
# Windows: -w 200 (ms), Linux: -W 1 (s)
timeout_val = '200' if platform.system().lower() == 'windows' else '1'
command = ['ping', param, '1', timeout_param, timeout_val, ip]
try:
# shell=False로 보안 강화, stdout/stderr 무시
res = subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return ip if res.returncode == 0 else None
except Exception:
return None
return jsonify({
"success": True,
"active_ips": active_ips,
"count": len(active_ips),
"message": f"스캔 완료: {len(active_ips)}개의 활성 IP 발견"
})
active_ips = []
# IP 리스트 생성
target_ips = []
temp_ip = start_ip
while temp_ip <= end_ip:
target_ips.append(temp_ip)
temp_ip += 1
# 병렬 처리 (최대 50 쓰레드)
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
results = executor.map(ping_ip, target_ips)
# 결과 수집 (None 제외)
active_ips = [ip for ip in results if ip is not None]
return jsonify({
"success": True,
"active_ips": active_ips,
"count": len(active_ips),
"message": f"스캔 완료: {len(active_ips)}개의 활성 IP 발견"
})
except Exception as e:
logging.error(f"Scan network fatal error: {e}")
return jsonify({"success": False, "error": f"서버 내부 오류: {str(e)}"}), 500