IP端口扫描+日志分析工具
import sys
import json
import csv
import socket
import ipaddress
import platform
import subprocess
import threading
import ssl
import struct
import re
import gzip
import pathlib
from urllib.parse import urlparse, parse_qs, unquote_plus
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Optional, Tuple, Dict, Any
from collections import Counter, defaultdict
from PyQt6.QtCore import Qt, QThread, pyqtSignal
from PyQt6.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QLineEdit, QPushButton, QSpinBox, QDoubleSpinBox, QCheckBox, QComboBox,
QTableWidget, QTableWidgetItem, QFileDialog, QMessageBox, QTextEdit,
QGroupBox, QProgressBar, QTabWidget, QListWidget, QListWidgetItem
)
# =============================
# 数据结构
# =============================
@dataclass
class ScanResult:
host: str
ip: str
proto: str # tcp/udp
port: int
state: str # open / open|filtered
service: str
banner: str
@dataclass
class LogFinding:
source: str # 文件名/路径
category: str # 系统/中间件/数据库/应用/安全
severity: str # INFO/WARN/ERROR/CRITICAL
timestamp: str
keyword: str
message: str
attack_ip: str = ""
attack_type: str = ""
# =============================
# 端口解析/目标解析
# =============================
def parse_ports(port_text: str) -> List[int]:
"""
支持:
- "1-65535"
- "22,80,443"
- "53,67-69,161"
"""
port_text = (port_text or "").strip()
if not port_text:
return []
ports = set()
for part in port_text.split(","):
part = part.strip()
if not part:
continue
if "-" in part:
a, b = part.split("-", 1)
a = int(a.strip()); b = int(b.strip())
if a > b:
a, b = b, a
for p in range(a, b + 1):
if 1 <= p <= 65535:
ports.add(p)
else:
p = int(part)
if 1 <= p <= 65535:
ports.add(p)
return sorted(ports)
def expand_targets(text: str) -> List[str]:
"""
支持:
- IPv4: 192.168.1.10
- IPv6: 2001:db8::1
- 域名: example.com
- CIDR: 192.168.1.0/24, 2001:db8::/120
- 多行/逗号分隔
"""
text = (text or "").replace(",", "\n")
items = [x.strip() for x in text.splitlines() if x.strip()]
expanded = []
for it in items:
if "/" in it:
try:
net = ipaddress.ip_network(it, strict=False)
for ip in net.hosts():
expanded.append(str(ip))
except Exception:
expanded.append(it)
else:
expanded.append(it)
# 去重保序
seen = set()
out = []
for x in expanded:
if x not in seen:
seen.add(x)
out.append(x)
return out
def resolve_host_to_ip(host: str) -> Tuple[str, str]:
"""
返回 (显示host, 解析到的ip)
- host可能是域名/IP
- 解析优先返回第一个地址
"""
try:
# 如果本身就是IP
ipaddress.ip_address(host)
return host, host
except Exception:
pass
try:
infos = socket.getaddrinfo(host, None)
# 优先 IPv4,然后 IPv6
ipv4 = None
ipv6 = None
for family, _, _, _, sockaddr in infos:
if family == socket.AF_INET:
ipv4 = sockaddr[0]
elif family == socket.AF_INET6:
ipv6 = sockaddr[0]
ip = ipv4 or ipv6
if not ip:
raise RuntimeError("无法解析IP")
return host, ip
except Exception:
return host, ""
def is_domain(host: str) -> bool:
try:
ipaddress.ip_address(host)
return False
except Exception:
return True
# =============================
# 探测与banner抓取
# =============================
SERVICE_GUESS = {
21: "ftp",
22: "ssh",
23: "telnet",
25: "smtp",
53: "dns",
80: "http",
110: "pop3",
123: "ntp",
135: "msrpc",
139: "netbios-ssn",
143: "imap",
161: "snmp",
389: "ldap",
443: "https",
445: "smb",
465: "smtps",
587: "smtp-submission",
993: "imaps",
995: "pop3s",
1433: "mssql",
1521: "oracle",
2049: "nfs",
2375: "docker",
3306: "mysql",
3389: "rdp",
5432: "postgres",
5900: "vnc",
6379: "redis",
7001: "weblogic",
8080: "http-alt",
8443: "https-alt",
9200: "elasticsearch",
11211: "memcached",
27017: "mongodb",
}
TLS_PORTS = {443, 8443, 9443, 993, 995, 465, 587, 990, 992, 994}
HTTP_PORTS = {80, 8080, 8000, 8888, 8081, 8181, 7001, 7100, 7501, 9090, 9200, 443, 8443, 9443}
def parse_http_title(body: str) -> str:
m = re.search(r"<title[^>]*>(.*?)</title>", body, re.IGNORECASE | re.DOTALL)
if not m:
return ""
title = re.sub(r"\s+", " ", m.group(1)).strip()
return title[:80]
def tls_handshake_and_cert(ip: str, port: int, timeout: float, server_name: str = "") -> Tuple[bool, str]:
"""返回 (tls_ok, cert_info_snippet)"""
family = socket.AF_INET6 if ":" in ip else socket.AF_INET
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
cert_info = ""
try:
with socket.socket(family, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
sock.connect((ip, port))
with ctx.wrap_socket(sock, server_hostname=server_name or None) as ssock:
ssock.settimeout(timeout)
cert = ssock.getpeercert()
if cert:
subject = cert.get("subject", [])
issuer = cert.get("issuer", [])
not_after = cert.get("notAfter", "")
def flatten(x):
out = []
for item in x:
for k, v in item:
out.append(f"{k}={v}")
return ", ".join(out)
cert_info = f"TLS OK; Subject: {flatten(subject)}; Issuer: {flatten(issuer)}; NotAfter: {not_after}"
else:
cert_info = "TLS OK; no cert info"
return True, sanitize_banner(cert_info)
except Exception:
return False, ""
def http_probe(ip: str, port: int, timeout: float, use_tls: bool, host_header: str = "localhost") -> str:
"""返回 HTTP 响应头 + title(如有)"""
family = socket.AF_INET6 if ":" in ip else socket.AF_INET
req = f"GET / HTTP/1.1\r\nHost: {host_header}\r\nUser-Agent: scanner\r\nConnection: close\r\n\r\n".encode()
try:
with socket.socket(family, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
sock.connect((ip, port))
if use_tls:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
sock = ctx.wrap_socket(sock, server_hostname=host_header)
sock.settimeout(timeout)
sock.sendall(req)
data = b""
while len(data) < 4096:
chunk = sock.recv(1024)
if not chunk:
break
data += chunk
text = safe_decode(data)
if "\r\n\r\n" in text:
header, body = text.split("\r\n\r\n", 1)
else:
header, body = text, ""
title = parse_http_title(body)
if title:
return sanitize_banner(header + "\r\n<title>" + title + "</title>")
return sanitize_banner(header)
except Exception:
return ""
def pg_probe(ip: str, port: int, timeout: float) -> str:
"""PostgreSQL SSLRequest 探测"""
family = socket.AF_INET6 if ":" in ip else socket.AF_INET
try:
with socket.socket(family, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect((ip, port))
payload = struct.pack("!II", 8, 80877103)
s.sendall(payload)
r = s.recv(1)
if r in (b"S", b"N"):
return "PostgreSQL detected (SSLRequest response: " + r.decode(errors="ignore") + ")"
except Exception:
pass
return ""
def redis_probe(ip: str, port: int, timeout: float) -> str:
family = socket.AF_INET6 if ":" in ip else socket.AF_INET
try:
with socket.socket(family, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect((ip, port))
s.sendall(b"PING\r\n")
data = s.recv(256)
return sanitize_banner(safe_decode(data))
except Exception:
return ""
def memcached_probe(ip: str, port: int, timeout: float) -> str:
family = socket.AF_INET6 if ":" in ip else socket.AF_INET
try:
with socket.socket(family, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect((ip, port))
s.sendall(b"version\r\n")
data = s.recv(256)
return sanitize_banner(safe_decode(data))
except Exception:
return ""
def mysql_probe(ip: str, port: int, timeout: float) -> str:
"""MySQL 连接后通常直接返回握手包"""
family = socket.AF_INET6 if ":" in ip else socket.AF_INET
try:
with socket.socket(family, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect((ip, port))
data = s.recv(512)
return sanitize_banner(safe_decode(data))
except Exception:
return ""
# 简化版 probes(模拟 nmap probes 思路:不同端口/协议发不同探测)
TCP_PROBES: List[Tuple[str, bytes]] = [
("HTTP", b"HEAD / HTTP/1.0\r\nHost: localhost\r\nUser-Agent: scanner\r\n\r\n"),
("HTTP-GET", b"GET / HTTP/1.0\r\nHost: localhost\r\nUser-Agent: scanner\r\n\r\n"),
("SMTP", b"EHLO example.com\r\n"),
("FTP", b"FEAT\r\n"),
("IMAP", b"A1 CAPABILITY\r\n"),
("POP3", b"QUIT\r\n"),
("REDIS", b"PING\r\n"),
("MEMCACHED", b"version\r\n"),
("MYSQL", b"\x00"),
("TELNET", b"\r\n"),
]
def safe_decode(b: bytes) -> str:
try:
return b.decode("utf-8", errors="replace")
except Exception:
return repr(b)
def sanitize_banner(s: str) -> str:
if not s:
return ""
s = s.replace("\r", "\\r").replace("\n", "\\n")
if len(s) > 400:
s = s[:400] + "..."
return s
# =============================
# 日志分析(离线文件分析)
# =============================
SEVERITY_KEYWORDS = {
"CRITICAL": ["panic", "fatal", "critical", "segfault", "core dump", "out of memory", "oom", "kernel panic"],
"ERROR": ["error", "failed", "exception", "traceback", "denied", "refused", "timeout", "unreachable"],
"WARN": ["warn", "warning", "deprecated", "slow", "retry", "throttle", "too many"],
}
SECURITY_KEYWORDS = [
"authentication failure", "invalid user", "failed password", "sshd", "sudo",
"unauthorized", "sql injection", "xss", "csrf", "bruteforce", "brute force",
"waf", "attack", "exploit", "malware", "ransom", "cve-"
]
ATTACK_PATTERNS = [
("SQL注入", [
r"\bunion\s+select\b", r"\binformation_schema\b",
r"\bor\s+1=1\b", r"\bsleep\s*\(", r"\bbenchmark\s*\(",
r"\bextractvalue\s*\(", r"\bupdatexml\s*\(", r"\bload_file\s*\(",
r"\binto\s+outfile\b", r"\bpg_sleep\s*\(", r"\bwaitfor\s+delay\b", r"\bxp_cmdshell\b"
]),
("XSS", [
r"<script\b", r"javascript:", r"onerror\s*=", r"onload\s*=", r"document\.cookie",
r"<img\b[^>]*onerror", r"<svg\b", r"alert\s*\(", r"prompt\s*\("
]),
("路径穿越", [
r"\.\./", r"\.\.\\", r"%2e%2e%2f", r"%2e%2e%5c", r"/etc/passwd", r"boot\.ini"
]),
("命令注入", [
r"\b;\s*cat\b", r"\b;\s*id\b", r"\b\|\s*whoami\b", r"\b\|\s*id\b",
r"\b&&\s*id\b", r"\b&&\s*whoami\b", r"\b`[^`]+`", r"\$\([^\)]+\)"
]),
("SSRF", [
r"169\.254\.169\.254", r"metadata\.google\.internal", r"\bfile://", r"\bgopher://",
r"\bhttp://127\.0\.0\.1", r"\bhttp://localhost"
]),
("扫描/探测", [
r"nmap", r"masscan", r"zmap", r"dirbuster", r"gobuster", r"nikto",
r"\b/wp-admin\b", r"\b/.git/\b", r"\b\.env\b"
]),
("暴力破解", [
r"failed password", r"invalid user", r"authentication failure", r"too many authentication failures",
r"login failed", r"bad password", r"bruteforce", r"brute force"
]),
("WebShell/恶意文件", [
r"c99\.php", r"r57\.php", r"shell\.php", r"webshell",
r"base64_decode\(", r"\beval\s*\(", r"\bassert\s*\("
]),
]
ATTACK_REGEX = [(name, [re.compile(pat, re.IGNORECASE) for pat in pats]) for name, pats in ATTACK_PATTERNS]
def detect_attack_type(line: str) -> str:
for name, regs in ATTACK_REGEX:
for rg in regs:
if rg.search(line):
return name
return ""
# =============================
# Access Log 结构化解析(Nginx/Apache)
# =============================
ACCESS_LOG_PATTERNS = [
# Common Log Format / Combined
re.compile(
r'^(?P<ip>\S+)\s+\S+\s+\S+\s+\[(?P<time>[^\]]+)\]\s+"(?P<method>[A-Z]+)\s+(?P<uri>[^\s]+)\s+(?P<proto>[^"]+)"\s+(?P<status>\d{3})\s+(?P<size>\S+)(\s+"(?P<referer>[^"]*)"\s+"(?P<ua>[^"]*)")?'
),
]
SQLI_STRONG = [
re.compile(p, re.IGNORECASE) for p in [
r"\bunion\s+select\b",
r"\binformation_schema\b",
r"\bselect\b.+\bfrom\b",
r"\b(or|and)\b\s+\d+=\d+",
r"\b(or|and)\b\s+'[^']+'='[^']+'",
r"\bsleep\s*\(",
r"\bbenchmark\s*\(",
r"\bextractvalue\s*\(",
r"\bupdatexml\s*\(",
r"\bpg_sleep\s*\(",
r"\bwaitfor\s+delay\b",
r"\bxp_cmdshell\b",
]
]
XSS_STRONG = [
re.compile(p, re.IGNORECASE) for p in [
r"<script\b",
r"javascript:",
r"onerror\s*=",
r"onload\s*=",
r"<svg\b",
r"document\.cookie",
r"alert\s*\(",
]
]
def parse_access_log_line(line: str) -> Optional[dict]:
for rg in ACCESS_LOG_PATTERNS:
m = rg.match(line)
if m:
d = m.groupdict()
return {
"ip": d.get("ip",""),
"time": d.get("time",""),
"method": d.get("method",""),
"uri": d.get("uri",""),
"status": d.get("status",""),
"referer": d.get("referer","") or "",
"ua": d.get("ua","") or "",
}
return None
def analyze_uri_for_attack(uri: str) -> Tuple[str, str]:
"""返回 (attack_type, evidence)"""
try:
uri = unquote_plus(uri)
except Exception:
pass
parsed = urlparse(uri)
qs = parse_qs(parsed.query)
# 先用强规则匹配 query 参数
for k, vals in qs.items():
for v in vals:
s = f"{k}={v}"
for rg in SQLI_STRONG:
if rg.search(s):
return "SQL注入", s[:120]
for rg in XSS_STRONG:
if rg.search(s):
return "XSS", s[:120]
# 再对整个 URI 做弱匹配(兜底)
for rg in SQLI_STRONG:
if rg.search(uri):
return "SQL注入", uri[:120]
for rg in XSS_STRONG:
if rg.search(uri):
return "XSS", uri[:120]
return "", ""
CRAWLER_UA_PATTERNS = [
# 常见正规搜索引擎爬虫
("搜索引擎爬虫", [r"googlebot", r"bingbot", r"baiduspider", r"yandex(bot)?", r"sogou", r"360spider", r"petalbot", r"bytespider"]),
# 常见商业爬虫/采集
("商业爬虫", [r"ahrefsbot", r"semrushbot", r"mj12bot", r"dotbot", r"serpstatbot"]),
# 常见脚本/工具型爬取(更偏“灰”)
("脚本/工具爬虫", [r"python-requests", r"java/", r"go-http-client", r"curl/", r"wget/", r"libwww-perl", r"scrapy", r"httpclient", r"okhttp"]),
]
CRAWLER_REGEX = [(name, [re.compile(p, re.IGNORECASE) for p in pats]) for name, pats in CRAWLER_UA_PATTERNS]
SUSPICIOUS_PATH_PATTERNS = [
re.compile(p, re.IGNORECASE) for p in [
r"/wp-admin", r"/wp-login\.php", r"/xmlrpc\.php",
r"/\.git/", r"/\.env\b", r"/phpmyadmin", r"/manager/html", r"/jenkins",
r"/actuator", r"/swagger", r"/v2/_catalog", r"/api-docs",
r"/etc/passwd", r"\.php\b", r"\.jsp\b", r"\.asp\b"
]
]
def detect_crawler(ua: str) -> Tuple[str, str]:
"""返回 (crawler_type, evidence)"""
if not ua:
return "", ""
for name, regs in CRAWLER_REGEX:
for rg in regs:
if rg.search(ua):
return name, rg.pattern
# 兜底:包含 bot/spider/crawl
if re.search(r"\b(bot|spider|crawl|crawler)\b", ua, re.IGNORECASE):
return "未知爬虫", "bot/spider/crawl"
return "", ""
def risk_score_access(ip: str, uri: str, status: str, ua: str) -> int:
"""非常简化的风险评分(用于 SIEM/蓝队视角汇总)"""
score = 0
try:
st = int(status)
if st in (401, 403):
score += 2
if st == 404:
score += 1
if st >= 500:
score += 1
except Exception:
pass
# 命中敏感路径
for rg in SUSPICIOUS_PATH_PATTERNS:
if rg.search(uri or ""):
score += 3
break
# 工具型 UA 加权
ctype, _ = detect_crawler(ua or "")
if ctype == "脚本/工具爬虫":
score += 3
elif ctype in ("商业爬虫", "未知爬虫"):
score += 2
elif ctype == "搜索引擎爬虫":
score += 0
return score
MIDDLEWARE_HINTS = {
"nginx": ["nginx", "upstream", "client prematurely closed", "connect() failed"],
"apache": ["apache", "httpd", "mod_", "AH0"],
"tomcat": ["catalina", "tomcat", "org.apache.catalina"],
"weblogic": ["weblogic", "<BEA-", "<Error>", "<Warning>"],
"redis": ["redis", "OOM command not allowed", "MISCONF", "Loading the dataset"],
}
DB_HINTS = {
"mysql": ["mysqld", "innodb", "mysql", "error 1045", "error 2002"],
"postgres": ["postgres", "FATAL:", "PANIC:", "could not connect", "connection refused"],
"mongodb": ["mongod", "mongodb", "wiredtiger", "E NETWORK"],
}
APP_HINTS = {
"java": ["exception", "at ", "Caused by:", "NullPointerException"],
"python": ["Traceback (most recent call last)", "Exception:", "ERROR"],
"nodejs": ["UnhandledPromiseRejectionWarning", "TypeError:", "ReferenceError:"],
}
TS_PATTERNS = [
re.compile(r"(?P<ts>\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2})"),
re.compile(r"(?P<ts>\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})"), # syslog: Feb 2 12:30:10
]
def guess_category(line: str) -> str:
low = line.lower()
for name, hints in DB_HINTS.items():
if any(h in low for h in hints):
return "数据库"
for name, hints in MIDDLEWARE_HINTS.items():
if any(h in low for h in hints):
return "中间件"
for name, hints in APP_HINTS.items():
if any(h.lower() in low for h in hints):
return "应用"
if any(k in low for k in SECURITY_KEYWORDS):
return "安全"
return "系统"
def extract_ts(line: str) -> str:
for pat in TS_PATTERNS:
m = pat.search(line)
if m:
return m.group("ts")
return ""
def normalize_ts(ts: str) -> str:
"""统一转换为 YYYY-MM-DD HH:MM:SS,无法解析则返回原字符串"""
ts = (ts or "").strip()
if not ts:
return ""
# 1) already iso
for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"]:
try:
dt = datetime.strptime(ts[:19], fmt)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
pass
# 2) syslog style: Feb 2 12:30:10
try:
dt = datetime.strptime(ts, "%b %d %H:%M:%S")
dt = dt.replace(year=datetime.now().year)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
pass
# 3) nginx/apache: 02/Feb/2026:03:50:38 +0000
try:
dt = datetime.strptime(ts.split()[0], "%d/%b/%Y:%H:%M:%S")
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
pass
return ts
# =============================
# IP 提取(用于日志分析 Attack IP 列)
# =============================
IPV4_RE = re.compile(r"\b(?:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\.){3}(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\b")
IPV6_RE = re.compile(r"\b(?:[0-9a-fA-F]{0,4}:){2,7}[0-9a-fA-F]{0,4}\b")
def extract_ip_from_line(line: str) -> str:
"""从日志行中尽量抽取一个 IP(IPv4/IPv6),并用 ipaddress 做二次校验避免把时间等误判成 IPv6。"""
if not line:
return ""
# 先找 IPv4(更明确)
for m in IPV4_RE.finditer(line):
cand = m.group(0)
try:
ipaddress.ip_address(cand)
return cand
except Exception:
continue
# 再找 IPv6:regex 只负责“可能长得像”,最终以 ipaddress 校验为准
for m in IPV6_RE.finditer(line):
cand = m.group(0)
# 过滤明显的时间戳形态(例如 15:36:49)
if re.fullmatch(r"\d{1,2}:\d{1,2}:\d{1,2}", cand):
continue
try:
ipaddress.ip_address(cand)
return cand
except Exception:
continue
return ""
def detect_severity(line: str) -> Tuple[str, str]:
low = line.lower()
for sev, kws in SEVERITY_KEYWORDS.items():
for kw in kws:
if kw in low:
return sev, kw
# security keywords as WARN/ERROR
for kw in SECURITY_KEYWORDS:
if kw in low:
return "WARN", kw
return "INFO", ""
def iter_log_lines(path: str, max_lines: int = 200000):
# 支持 .gz
if path.lower().endswith(".gz"):
with gzip.open(path, "rt", encoding="utf-8", errors="replace") as f:
for i, line in enumerate(f):
if i >= max_lines:
break
yield line.rstrip("\n")
else:
with open(path, "r", encoding="utf-8", errors="replace") as f:
for i, line in enumerate(f):
if i >= max_lines:
break
yield line.rstrip("\n")
def analyze_log_file(path: str, max_findings: int = 5000) -> Tuple[List[LogFinding], Dict[str, Any]]:
findings: List[LogFinding] = []
counts = Counter()
sev_counts = Counter()
cat_counts = Counter()
atk_counts = Counter()
# Access Log 统计(SIEM视角)
access_ip_counts = Counter()
access_uri_counts = Counter()
access_ua_counts = Counter()
access_status_counts = Counter()
risk_by_ip = Counter()
for line in iter_log_lines(path):
access = parse_access_log_line(line)
if access:
# 统计(所有 access 行都统计)
ip_ = access.get('ip','')
uri_ = access.get('uri','')
ua_ = access.get('ua','')
st_ = access.get('status','')
access_ip_counts[ip_] += 1
access_uri_counts[uri_] += 1
if ua_:
access_ua_counts[ua_] += 1
if st_:
access_status_counts[st_] += 1
risk_by_ip[ip_] += risk_score_access(ip_, uri_, st_, ua_)
# 1) 先做 SQLi/XSS 参数级分析
atk, ev = analyze_uri_for_attack(uri_)
if atk:
# 强制告警
sev = 'WARN'
kw = ev or atk
ts = access.get('time','')
cat = '安全'
findings.append(LogFinding(
source=path,
category=cat,
severity=sev,
timestamp=normalize_ts(ts),
keyword=kw,
message=f"{access.get('ip')} {access.get('method')} {access.get('uri')} status={access.get('status')} referer={access.get('referer')} ua={access.get('ua')}",
attack_ip=access.get('ip',''),
attack_type=atk
))
counts[kw or sev] += 1
sev_counts[sev] += 1
cat_counts[cat] += 1
atk_counts[atk] += 1
if len(findings) >= max_findings:
break
continue
# 2) 再做爬虫识别(不一定是攻击,但可用于蓝队/运营画像)
ctype, cev = detect_crawler(access.get('ua','') or '')
if ctype:
sev2 = 'INFO' if ctype == '搜索引擎爬虫' else 'WARN'
ts = access.get('time','')
cat = '爬虫'
findings.append(LogFinding(
source=path,
category=cat,
severity=sev2,
timestamp=normalize_ts(ts),
keyword=cev,
message=f"{access.get('ip')} {access.get('method')} {access.get('uri')} status={access.get('status')} referer={access.get('referer')} ua={access.get('ua')}",
attack_ip=access.get('ip',''),
attack_type='爬虫'
))
counts[cev or sev2] += 1
sev_counts[sev2] += 1
cat_counts[cat] += 1
atk_counts['爬虫'] += 1
if len(findings) >= max_findings:
break
sev, kw = detect_severity(line)
if sev == "INFO":
continue
ts = extract_ts(line)
cat = guess_category(line)
attack_type = detect_attack_type(line)
if attack_type:
cat = "安全"
if sev == "INFO":
sev = "WARN"
findings.append(LogFinding(
source=path,
category=cat,
severity=sev,
timestamp=normalize_ts(ts),
keyword=kw,
message=line[:500],
attack_ip=extract_ip_from_line(line),
attack_type=attack_type
))
counts[kw or sev] += 1
sev_counts[sev] += 1
cat_counts[cat] += 1
if attack_type:
atk_counts[attack_type] += 1
if len(findings) >= max_findings:
break
stats = {
"file": path,
"total_findings": len(findings),
"severity": dict(sev_counts),
"category": dict(cat_counts),
"top_keywords": counts.most_common(15),
"top_attack_types": atk_counts.most_common(10),
"access_top_ips": access_ip_counts.most_common(15),
"access_top_uris": access_uri_counts.most_common(15),
"access_top_uas": access_ua_counts.most_common(10),
"access_status": access_status_counts.most_common(10),
"risk_top_ips": risk_by_ip.most_common(15),
}
return findings, stats
def infer_service_from_banner(banner: str) -> str:
if not banner:
return ""
lower = banner.lower()
if "ssh-" in lower:
return "ssh"
if "http/" in lower or "server:" in lower or "set-cookie:" in lower:
return "http"
if "smtp" in lower or "esmtp" in lower:
return "smtp"
if "imap" in lower:
return "imap"
if "pop3" in lower:
return "pop3"
if "redis" in lower or lower.startswith("+pong"):
return "redis"
if "memcached" in lower:
return "memcached"
if "mysql" in lower:
return "mysql"
if "postgres" in lower:
return "postgres"
if "mongodb" in lower:
return "mongodb"
if "tls ok" in lower or "issuer=" in lower:
return "tls"
if "ftp" in lower:
return "ftp"
return ""
lower = banner.lower()
if "ssh-" in lower:
return "ssh"
if "http/" in lower or "server:" in lower:
return "http"
if "smtp" in lower or "esmtp" in lower:
return "smtp"
if "redis" in lower:
return "redis"
if "mysql" in lower:
return "mysql"
if "postgres" in lower:
return "postgres"
if "mongodb" in lower:
return "mongodb"
if "ftp" in lower:
return "ftp"
return ""
def tcp_connect(host: str, port: int, timeout: float) -> bool:
family = socket.AF_INET6 if ":" in host else socket.AF_INET
try:
with socket.socket(family, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
return s.connect_ex((host, port)) == 0
except Exception:
return False
def banner_grab_tcp(host: str, port: int, timeout: float, sni_name: str = "") -> Tuple[str, str]:
"""更强 probes + TLS 探测"""
family = socket.AF_INET6 if ":" in host else socket.AF_INET
service = SERVICE_GUESS.get(port, "")
banner = ""
# 1) TLS 探测
tls_ok = False
tls_info = ""
if port in TLS_PORTS:
tls_ok, tls_info = tls_handshake_and_cert(host, port, timeout, server_name=sni_name)
if tls_ok:
if not service:
service = "tls"
if port in (443, 8443, 9443):
service = "https"
elif port == 993:
service = "imaps"
elif port == 995:
service = "pop3s"
elif port == 465:
service = "smtps"
elif port == 587:
service = "smtp-submission"
banner = tls_info
# 2) HTTP 探测(header/title)
if port in HTTP_PORTS:
http_banner = http_probe(host, port, timeout, use_tls=tls_ok, host_header=(sni_name or "localhost"))
if http_banner:
banner = http_banner
service = "https" if (tls_ok or port in (443, 8443, 9443)) else "http"
# 3) 特定协议探测
if port == 6379:
b = redis_probe(host, port, timeout)
if b:
banner = b
service = "redis"
if port == 11211:
b = memcached_probe(host, port, timeout)
if b:
banner = b
service = "memcached"
if port == 3306:
b = mysql_probe(host, port, timeout)
if b:
banner = b
service = "mysql"
if port == 5432:
b = pg_probe(host, port, timeout)
if b:
banner = sanitize_banner(b)
service = "postgres"
# 4) 被动 banner + 通用 probes 兜底
if not banner:
try:
with socket.socket(family, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect((host, port))
try:
data = s.recv(512)
if data:
banner = safe_decode(data)
except Exception:
pass
if not banner:
probes = TCP_PROBES
if port in (80, 8080, 8000, 8443, 443):
probes = [TCP_PROBES[0], TCP_PROBES[1]] + TCP_PROBES[2:]
for _, payload in probes:
if payload:
try:
s.sendall(payload)
except Exception:
continue
try:
data2 = s.recv(512)
if data2:
banner = safe_decode(data2)
break
except Exception:
continue
except Exception:
pass
banner = sanitize_banner(banner)
if not service:
service = infer_service_from_banner(banner)
if tls_ok and not banner and tls_info:
banner = tls_info
return service, banner
def scan_tcp(host: str, ip: str, port: int, timeout: float, do_banner: bool) -> Optional[ScanResult]:
if not tcp_connect(ip, port, timeout):
return None
service = SERVICE_GUESS.get(port, "")
banner = ""
if do_banner:
service2, banner2 = banner_grab_tcp(ip, port, timeout, sni_name=(host if is_domain(host) else ""))
if service2:
service = service2
banner = banner2
return ScanResult(host=host, ip=ip, proto="tcp", port=port, state="open", service=service, banner=banner)
# =============================
# 可达性探测
# =============================
def ping_host(host: str, timeout_ms: int = 800) -> bool:
system = platform.system().lower()
try:
if system == "windows":
cmd = ["ping", "-n", "1", "-w", str(timeout_ms), host]
else:
cmd = ["ping", "-c", "1", "-W", "1", host]
res = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=max(1, timeout_ms // 1000 + 1))
return res.returncode == 0
except Exception:
return False
def tcp_reachable_probe(ip: str, timeout: float = 0.6, ports: Tuple[int, ...] = (443, 80, 22)) -> bool:
for p in ports:
try:
family = socket.AF_INET6 if ":" in ip else socket.AF_INET
with socket.socket(family, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
r = s.connect_ex((ip, p))
if r == 0 or r in (111, 61, 10061):
return True
except Exception:
continue
return False
# =============================
# 扫描线程
# =============================
class ScannerThread(QThread):
log = pyqtSignal(str)
progress = pyqtSignal(int, int) # done, total
result = pyqtSignal(object) # ScanResult
finished_scan = pyqtSignal(dict) # stats
def __init__(
self,
targets: List[str],
tcp_ports: List[int],
threads: int,
timeout: float,
do_banner: bool,
skip_unreachable: bool,
reach_method: str, # "tcp" or "ping"
parent=None
):
super().__init__(parent)
self.targets = targets
self.tcp_ports = tcp_ports
self.threads = max(1, threads)
self.timeout = max(0.05, timeout)
self.do_banner = do_banner
self.skip_unreachable = skip_unreachable
self.reach_method = reach_method
self._stop = threading.Event()
self._results: List[ScanResult] = []
def stop(self):
self._stop.set()
@property
def results(self) -> List[ScanResult]:
return list(self._results)
def run(self):
from concurrent.futures import ThreadPoolExecutor, as_completed
# 解析目标 -> (host, ip)
resolved = []
for host in self.targets:
if self._stop.is_set():
self.log.emit("已停止。")
self.finished_scan.emit({})
return
h, ip = resolve_host_to_ip(host)
if not ip:
self.log.emit(f"[解析失败] {host}")
continue
resolved.append((h, ip))
# 可达性探测
live = []
for h, ip in resolved:
if self._stop.is_set():
self.log.emit("已停止。")
self.finished_scan.emit({})
return
if not self.skip_unreachable:
live.append((h, ip))
continue
ok = ping_host(ip) if self.reach_method == "ping" else tcp_reachable_probe(ip, timeout=min(1.0, self.timeout))
if ok:
live.append((h, ip))
self.log.emit(f"[主机在线] {h} ({ip})")
else:
self.log.emit(f"[跳过] {h} ({ip}) 不可达")
# 构建任务
tasks = []
for h, ip in live:
for p in self.tcp_ports:
tasks.append(("tcp", h, ip, p))
total = len(tasks)
done = 0
self.progress.emit(done, total)
self.log.emit(f"开始扫描:目标 {len(live)} 台,任务 {total} 个")
def do_task(proto: str, h: str, ip: str, port: int) -> Optional[ScanResult]:
if self._stop.is_set():
return None
if proto == "tcp":
return scan_tcp(h, ip, port, self.timeout, self.do_banner)
try:
with ThreadPoolExecutor(max_workers=self.threads) as ex:
future_map = {ex.submit(do_task, proto, h, ip, port): (proto, h, ip, port) for proto, h, ip, port in tasks}
for fut in as_completed(future_map):
if self._stop.is_set():
self.log.emit("用户请求停止。")
break
res = None
try:
res = fut.result()
except Exception:
res = None
done += 1
self.progress.emit(done, total)
if res:
self._results.append(res)
self.result.emit(res)
except Exception as e:
self.log.emit(f"[异常] {e}")
stats = self.build_stats(self._results)
self.finished_scan.emit(stats)
def build_stats(self, results: List[ScanResult]) -> dict:
per_host_open = defaultdict(int)
svc_counter = Counter()
proto_counter = Counter()
for r in results:
if r.state.startswith("open"):
per_host_open[r.ip] += 1
if r.service:
svc_counter[r.service] += 1
proto_counter[r.proto] += 1
top_services = svc_counter.most_common(10)
return {
"generated_at": datetime.now().isoformat(),
"total_results": len(results),
"per_host_open_ports": dict(per_host_open),
"top_services": top_services,
"proto_count": dict(proto_counter),
}
# =============================
# UI
# =============================
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("端口扫描器(PyQt6)- TCP + CIDR + Banner + 双栈 + 报告导出")
self.resize(1280, 760)
self.thread: Optional[ScannerThread] = None
self.all_results: List[ScanResult] = []
self.last_stats: Dict[str, Any] = {}
root = QWidget()
self.setCentralWidget(root)
layout = QVBoxLayout(root)
self.tabs = QTabWidget()
layout.addWidget(self.tabs)
scan_tab = QWidget()
scan_layout = QVBoxLayout(scan_tab)
self.tabs.addTab(scan_tab, "端口扫描")
log_tab = QWidget()
log_layout = QVBoxLayout(log_tab)
self.tabs.addTab(log_tab, "日志分析")
# 目标
g_target = QGroupBox("目标(IP/域名/CIDR,支持多行或逗号分隔,支持IPv6)")
gl = QVBoxLayout(g_target)
self.targets_edit = QTextEdit()
self.targets_edit.setPlaceholderText("例如:\n192.168.1.10\n192.168.1.0/24\n2001:db8::1\n2001:db8::/120\nexample.com")
self.targets_edit.setFixedHeight(120)
gl.addWidget(self.targets_edit)
scan_layout.addWidget(g_target)
# 参数
g_opt = QGroupBox("扫描参数")
opt = QHBoxLayout(g_opt)
opt.addWidget(QLabel("TCP端口:"))
self.tcp_ports_edit = QLineEdit("1-65535") # 默认全端口
self.tcp_ports_edit.setToolTip("默认全端口:1-65535;也支持 22,80,443 或 1-1024")
opt.addWidget(self.tcp_ports_edit, 2)
opt.addWidget(QLabel("线程数:"))
self.threads_spin = QSpinBox()
self.threads_spin.setRange(1, 3000)
self.threads_spin.setValue(600)
opt.addWidget(self.threads_spin)
opt.addWidget(QLabel("超时(s):"))
self.timeout_spin = QDoubleSpinBox()
self.timeout_spin.setRange(0.05, 10.0)
self.timeout_spin.setSingleStep(0.1)
self.timeout_spin.setValue(0.6)
opt.addWidget(self.timeout_spin)
self.banner_check = QCheckBox("服务识别(Banner/Probe)")
self.banner_check.setChecked(True)
opt.addWidget(self.banner_check)
self.skip_check = QCheckBox("跳过不可达主机")
self.skip_check.setChecked(True)
opt.addWidget(self.skip_check)
opt.addWidget(QLabel("可达性探测:"))
self.reach_combo = QComboBox()
self.reach_combo.addItems(["TCP探测(推荐)", "Ping(ICMP)"])
opt.addWidget(self.reach_combo)
scan_layout.addWidget(g_opt)
# 按钮
btn_row = QHBoxLayout()
self.start_btn = QPushButton("开始扫描")
self.stop_btn = QPushButton("停止")
self.stop_btn.setEnabled(False)
self.export_json_btn = QPushButton("导出JSON(含统计)")
self.export_csv_btn = QPushButton("导出CSV(含统计)")
self.export_json_btn.setEnabled(False)
self.export_csv_btn.setEnabled(False)
btn_row.addWidget(self.start_btn)
btn_row.addWidget(self.stop_btn)
btn_row.addStretch(1)
btn_row.addWidget(self.export_json_btn)
btn_row.addWidget(self.export_csv_btn)
scan_layout.addLayout(btn_row)
# 进度条
prog_row = QHBoxLayout()
self.status_label = QLabel("状态:空闲")
self.progress_label = QLabel("0/0")
self.progress_bar = QProgressBar()
self.progress_bar.setMinimum(0)
self.progress_bar.setMaximum(100)
self.progress_bar.setValue(0)
prog_row.addWidget(self.status_label, 2)
prog_row.addWidget(self.progress_bar, 4)
prog_row.addWidget(self.progress_label, 1)
scan_layout.addLayout(prog_row)
# 表格
self.table = QTableWidget(0, 7)
self.table.setHorizontalHeaderLabels(["目标", "IP", "协议", "端口", "状态", "服务", "Banner/响应"])
self.table.horizontalHeader().setStretchLastSection(True)
#self.table.setSortingEnabled(True)
scan_layout.addWidget(self.table, 5)
# 日志
self.log_box = QTextEdit()
self.log_box.setReadOnly(True)
self.log_box.setFixedHeight(160)
scan_layout.addWidget(self.log_box)
# =============================
# 日志分析 Tab
# =============================
g_log = QGroupBox("导入日志文件/目录(支持 .log/.txt/.out/.err/.gz 等)")
lg = QVBoxLayout(g_log)
row1 = QHBoxLayout()
self.log_path_edit = QLineEdit()
self.log_path_edit.setPlaceholderText("选择日志文件或目录...")
self.btn_pick_file = QPushButton("选择文件")
self.btn_pick_dir = QPushButton("选择目录")
row1.addWidget(self.log_path_edit, 4)
row1.addWidget(self.btn_pick_file, 1)
row1.addWidget(self.btn_pick_dir, 1)
lg.addLayout(row1)
row2 = QHBoxLayout()
row2.addWidget(QLabel("最大读取行数/文件:"))
self.max_lines_spin = QSpinBox()
self.max_lines_spin.setRange(1000, 2000000)
self.max_lines_spin.setValue(200000)
row2.addWidget(self.max_lines_spin)
row2.addWidget(QLabel("最大告警条数/文件:"))
self.max_findings_spin = QSpinBox()
self.max_findings_spin.setRange(100, 20000)
self.max_findings_spin.setValue(5000)
row2.addWidget(self.max_findings_spin)
self.btn_analyze_logs = QPushButton("开始分析")
row2.addWidget(self.btn_analyze_logs)
row2.addStretch(1)
lg.addLayout(row2)
log_layout.addWidget(g_log)
# 统计输出
self.log_stats_box = QTextEdit()
self.log_stats_box.setReadOnly(True)
self.log_stats_box.setFixedHeight(160)
log_layout.addWidget(self.log_stats_box)
# 日志过滤
g_filter = QGroupBox("日志过滤")
fl = QHBoxLayout(g_filter)
fl.addWidget(QLabel("严重性:"))
self.filter_sev = QComboBox()
self.filter_sev.addItems(["全部", "INFO", "WARN", "ERROR", "CRITICAL"])
fl.addWidget(self.filter_sev)
fl.addWidget(QLabel("类别:"))
self.filter_cat = QComboBox()
self.filter_cat.addItems(["全部", "系统", "中间件", "数据库", "应用", "安全", "爬虫"])
fl.addWidget(self.filter_cat)
fl.addWidget(QLabel("攻击类型:"))
self.filter_atk = QComboBox()
self.filter_atk.addItems(["全部", "SQL注入", "XSS", "路径穿越", "命令注入", "SSRF", "扫描/探测", "暴力破解", "WebShell/恶意文件", "爬虫"])
fl.addWidget(self.filter_atk)
fl.addWidget(QLabel("关键字:"))
self.filter_kw = QLineEdit()
self.filter_kw.setPlaceholderText("包含关键字(支持 IP/URI/UA/Referer)...")
fl.addWidget(self.filter_kw, 3)
self.btn_apply_filter = QPushButton("应用过滤")
self.btn_reset_filter = QPushButton("重置")
fl.addWidget(self.btn_apply_filter)
fl.addWidget(self.btn_reset_filter)
log_layout.addWidget(g_filter)
# Findings 列表
self.findings_table = QTableWidget(0, 8)
self.findings_table.setHorizontalHeaderLabels(["严重性", "类别", "攻击类型", "攻击IP", "时间", "关键字", "来源", "内容"])
self.findings_table.horizontalHeader().setStretchLastSection(True)
log_layout.addWidget(self.findings_table, 5)
# 导出按钮
row3 = QHBoxLayout()
self.btn_export_findings_json = QPushButton("导出发现(JSON)")
self.btn_export_findings_csv = QPushButton("导出发现(CSV)")
self.btn_export_findings_json.setEnabled(False)
self.btn_export_findings_csv.setEnabled(False)
row3.addStretch(1)
row3.addWidget(self.btn_export_findings_json)
row3.addWidget(self.btn_export_findings_csv)
log_layout.addLayout(row3)
self.log_findings: List[LogFinding] = []
self.log_stats: List[dict] = []
# signals
self.start_btn.clicked.connect(self.on_start)
self.stop_btn.clicked.connect(self.on_stop)
self.export_json_btn.clicked.connect(self.on_export_json)
self.export_csv_btn.clicked.connect(self.on_export_csv)
# 日志分析 signals
self.btn_pick_file.clicked.connect(self.pick_log_file)
self.btn_pick_dir.clicked.connect(self.pick_log_dir)
self.btn_analyze_logs.clicked.connect(self.analyze_logs)
self.btn_export_findings_json.clicked.connect(self.export_findings_json)
self.btn_export_findings_csv.clicked.connect(self.export_findings_csv)
self.btn_apply_filter.clicked.connect(self.apply_log_filter)
self.btn_reset_filter.clicked.connect(self.reset_log_filter)
def append_log(self, msg: str):
ts = datetime.now().strftime("%H:%M:%S")
self.log_box.append(f"[{ts}] {msg}")
def on_start(self):
if self.thread and self.thread.isRunning():
QMessageBox.warning(self, "提示", "扫描正在进行中。")
return
targets_raw = self.targets_edit.toPlainText().strip()
if not targets_raw:
QMessageBox.warning(self, "提示", "请输入至少一个目标。")
return
try:
targets = expand_targets(targets_raw)
except Exception as e:
QMessageBox.critical(self, "错误", f"目标解析失败:{e}")
return
try:
tcp_ports = parse_ports(self.tcp_ports_edit.text())
except Exception as e:
QMessageBox.critical(self, "错误", f"端口格式错误:{e}")
return
if not tcp_ports:
QMessageBox.warning(self, "提示", "请填写 TCP")
return
# reset
self.table.setRowCount(0)
self.all_results = []
self.last_stats = {}
self.export_json_btn.setEnabled(False)
self.export_csv_btn.setEnabled(False)
threads = int(self.threads_spin.value())
timeout = float(self.timeout_spin.value())
do_banner = bool(self.banner_check.isChecked())
skip_unreachable = bool(self.skip_check.isChecked())
reach_method = "tcp" if self.reach_combo.currentIndex() == 0 else "ping"
self.status_label.setText("状态:扫描中...")
self.progress_label.setText("0/0")
self.progress_bar.setValue(0)
self.start_btn.setEnabled(False)
self.stop_btn.setEnabled(True)
self.append_log(f"开始:目标={len(targets)} TCP端口={len(tcp_ports)}")
self.thread = ScannerThread(
targets=targets,
tcp_ports=tcp_ports,
threads=threads,
timeout=timeout,
do_banner=do_banner,
skip_unreachable=skip_unreachable,
reach_method=reach_method
)
self.thread.log.connect(self.append_log)
self.thread.progress.connect(self.on_progress)
self.thread.result.connect(self.on_result)
self.thread.finished_scan.connect(self.on_finished)
self.thread.start()
def on_stop(self):
if self.thread and self.thread.isRunning():
self.thread.stop()
self.append_log("用户请求停止...")
def on_progress(self, done: int, total: int):
self.progress_label.setText(f"{done}/{total}")
if total <= 0:
self.progress_bar.setValue(0)
else:
self.progress_bar.setValue(int(done * 100 / total))
def on_result(self, res: ScanResult):
self.all_results.append(res)
row = self.table.rowCount()
self.table.insertRow(row)
self.table.setItem(row, 0, QTableWidgetItem(res.host))
self.table.setItem(row, 1, QTableWidgetItem(res.ip))
self.table.setItem(row, 2, QTableWidgetItem(res.proto))
self.table.setItem(row, 3, QTableWidgetItem(str(res.port)))
self.table.setItem(row, 4, QTableWidgetItem(res.state))
self.table.setItem(row, 5, QTableWidgetItem(res.service))
self.table.setItem(row, 6, QTableWidgetItem(res.banner))
def on_finished(self, stats: dict):
self.last_stats = stats or {}
self.status_label.setText("状态:完成")
self.start_btn.setEnabled(True)
self.stop_btn.setEnabled(False)
if self.all_results:
self.export_json_btn.setEnabled(True)
self.export_csv_btn.setEnabled(True)
self.append_log(f"完成:发现结果 {len(self.all_results)} 条")
else:
self.append_log("完成:未发现开放端口(或全部被过滤)")
# if self.last_stats:
# self.append_log("=== 统计 ===")
# self.append_log(f"总结果数:{self.last_stats.get('total_results', 0)}")
# self.append_log(f"协议统计:{self.last_stats.get('proto_count', {})}")
# self.append_log(f"Top服务:{self.last_stats.get('top_services', [])}")
def on_export_json(self):
if not self.all_results:
QMessageBox.information(self, "提示", "没有可导出的结果。")
return
path, _ = QFileDialog.getSaveFileName(self, "保存JSON", "scan_results.json", "JSON 文件 (*.json)")
if not path:
return
payload = {
"generated_at": datetime.now().isoformat(),
"stats": self.last_stats,
"results": [asdict(r) for r in self.all_results],
}
try:
with open(path, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
self.append_log(f"已导出JSON:{path}")
except Exception as e:
QMessageBox.critical(self, "错误", f"导出失败:{e}")
def on_export_csv(self):
if not self.all_results:
QMessageBox.information(self, "提示", "没有可导出的结果。")
return
path, _ = QFileDialog.getSaveFileName(self, "保存CSV", "scan_results.csv", "CSV 文件 (*.csv)")
if not path:
return
try:
with open(path, "w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
# # 写统计头
# w.writerow(["# generated_at", datetime.now().isoformat()])
# w.writerow(["# total_results", self.last_stats.get("total_results", 0)])
# w.writerow(["# proto_count", json.dumps(self.last_stats.get("proto_count", {}), ensure_ascii=False)])
# w.writerow(["# top_services", json.dumps(self.last_stats.get("top_services", []), ensure_ascii=False)])
# w.writerow([])
# 写明细
w.writerow(["host", "ip", "proto", "port", "state", "service", "banner"])
for r in self.all_results:
w.writerow([r.host, r.ip, r.proto, r.port, r.state, r.service, r.banner])
self.append_log(f"已导出CSV:{path}")
except Exception as e:
QMessageBox.critical(self, "错误", f"导出失败:{e}")
# =============================
# 日志分析功能
# =============================
def pick_log_file(self):
path, _ = QFileDialog.getOpenFileName(self, "选择日志文件", "", "日志文件 (*.log *.txt *.out *.err *.gz);;所有文件 (*.*)")
if path:
self.log_path_edit.setText(path)
def pick_log_dir(self):
path = QFileDialog.getExistingDirectory(self, "选择日志目录")
if path:
self.log_path_edit.setText(path)
def iter_log_paths(self, root_path: str) -> List[str]:
p = pathlib.Path(root_path)
if p.is_file():
return [str(p)]
if p.is_dir():
exts = {".log", ".txt", ".out", ".err", ".gz"}
files = []
for fp in p.rglob("*"):
if fp.is_file() and (fp.suffix.lower() in exts or fp.name.lower().endswith(".log.gz")):
files.append(str(fp))
return files[:500] # 防止目录过大
return []
def analyze_logs(self):
root_path = self.log_path_edit.text().strip()
if not root_path:
QMessageBox.warning(self, "提示", "请先选择日志文件或目录。")
return
paths = self.iter_log_paths(root_path)
if not paths:
QMessageBox.warning(self, "提示", "未找到可分析的日志文件。")
return
self.log_findings = []
self.log_stats = []
self.findings_table.setRowCount(0)
self.log_stats_box.clear()
max_lines = int(self.max_lines_spin.value())
max_findings = int(self.max_findings_spin.value())
self.log_stats_box.append(f"开始分析:文件数={len(paths)}")
QApplication.processEvents()
for path in paths:
try:
# 覆盖 max_lines / max_findings
global iter_log_lines
# 临时包装 iter_log_lines 以传入 max_lines
def _iter(path_in):
return iter_log_lines(path_in, max_lines=max_lines)
findings, stats = analyze_log_file(path, max_findings=max_findings)
self.log_stats.append(stats)
self.log_findings.extend(findings)
except Exception as e:
self.log_stats_box.append(f"[失败] {path}: {e}")
# 输出统计汇总
total_files = len(self.log_stats)
total_findings = len(self.log_findings)
sev_sum = Counter()
cat_sum = Counter()
kw_sum = Counter()
for st in self.log_stats:
for k, v in st.get("severity", {}).items():
sev_sum[k] += v
for k, v in st.get("category", {}).items():
cat_sum[k] += v
for k, v in st.get("top_keywords", []):
if k:
kw_sum[k] += v
self.log_stats_box.append("")
self.log_stats_box.append("=== 汇总统计 ===")
self.log_stats_box.append(f"文件数:{total_files}")
self.log_stats_box.append(f"告警条数:{total_findings}")
self.log_stats_box.append(f"严重性统计:{dict(sev_sum)}")
self.log_stats_box.append(f"类别统计:{dict(cat_sum)}")
atk_sum = Counter()
for st in self.log_stats:
for k, v in st.get("top_attack_types", []):
atk_sum[k] += v
# self.log_stats_box.append(f"Top关键字:{kw_sum.most_common(10)}")
# access_ip_sum = Counter()
# access_uri_sum = Counter()
# access_status_sum = Counter()
# risk_ip_sum = Counter()
# for st in self.log_stats:
# for k, v in st.get('access_top_ips', []):
# access_ip_sum[k] += v
# for k, v in st.get('access_top_uris', []):
# access_uri_sum[k] += v
# for k, v in st.get('access_status', []):
# access_status_sum[k] += v
# for k, v in st.get('risk_top_ips', []):
# risk_ip_sum[k] += v
# self.log_stats_box.append(f"Top攻击类型:{atk_sum.most_common(10)}")
# self.log_stats_box.append(f"Top访问IP:{access_ip_sum.most_common(10)}")
# self.log_stats_box.append(f"Top访问URI:{access_uri_sum.most_common(10)}")
# self.log_stats_box.append(f"状态码分布:{access_status_sum.most_common(10)}")
# self.log_stats_box.append(f"Top风险IP:{risk_ip_sum.most_common(10)}")
# 填充表格
self.refresh_findings_table(self.log_findings)
self.btn_export_findings_json.setEnabled(bool(self.log_findings))
self.btn_export_findings_csv.setEnabled(bool(self.log_findings))
def export_findings_json(self):
if not self.log_findings:
return
path, _ = QFileDialog.getSaveFileName(self, "保存JSON", "log_findings.json", "JSON 文件 (*.json)")
if not path:
return
payload = {
"generated_at": datetime.now().isoformat(),
"summary": self.log_stats,
"findings": [asdict(x) for x in self.log_findings],
}
with open(path, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
QMessageBox.information(self, "完成", "已导出 JSON。")
def export_findings_csv(self):
if not self.log_findings:
return
path, _ = QFileDialog.getSaveFileName(self, "保存CSV", "log_findings.csv", "CSV 文件 (*.csv)")
if not path:
return
with open(path, "w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(["severity", "category", "attack_type", "attack_ip", "timestamp", "keyword", "source", "message"])
for x in self.log_findings:
w.writerow([x.severity, x.category, x.attack_type, getattr(x, "attack_ip", ""), x.timestamp, x.keyword, x.source, x.message])
QMessageBox.information(self, "完成", "已导出 CSV。")
def refresh_findings_table(self, findings: List[LogFinding]):
self.findings_table.setRowCount(0)
for fnd in findings[:20000]:
row = self.findings_table.rowCount()
self.findings_table.insertRow(row)
# 8 columns:
# 0 严重性, 1 类别, 2 攻击类型, 3 攻击IP, 4 时间, 5 关键字, 6 来源, 7 内容
self.findings_table.setItem(row, 0, QTableWidgetItem(fnd.severity))
self.findings_table.setItem(row, 1, QTableWidgetItem(fnd.category))
self.findings_table.setItem(row, 2, QTableWidgetItem(fnd.attack_type))
self.findings_table.setItem(row, 3, QTableWidgetItem(getattr(fnd, "attack_ip", "") or ""))
self.findings_table.setItem(row, 4, QTableWidgetItem(fnd.timestamp))
self.findings_table.setItem(row, 5, QTableWidgetItem(fnd.keyword))
self.findings_table.setItem(row, 6, QTableWidgetItem(fnd.source))
self.findings_table.setItem(row, 7, QTableWidgetItem(fnd.message))
def apply_log_filter(self):
if not self.log_findings:
return
sev = self.filter_sev.currentText()
cat = self.filter_cat.currentText()
atk = self.filter_atk.currentText()
kw = self.filter_kw.text().strip().lower()
out = []
for f in self.log_findings:
if sev != "全部" and f.severity != sev:
continue
if cat != "全部" and f.category != cat:
continue
if atk != "全部" and (f.attack_type or "") != atk:
continue
if kw:
hay = " ".join([f.keyword or "", f.message or "", f.source or "", f.timestamp or "", getattr(f, "attack_ip", "") or ""]).lower()
if kw not in hay:
continue
out.append(f)
self.refresh_findings_table(out)
self.log_stats_box.append(f"[过滤] 命中 {len(out)}/{len(self.log_findings)} 条")
def reset_log_filter(self):
self.filter_sev.setCurrentIndex(0)
self.filter_cat.setCurrentIndex(0)
self.filter_atk.setCurrentIndex(0)
self.filter_kw.clear()
self.refresh_findings_table(self.log_findings)
def main():
app = QApplication(sys.argv)
win = MainWindow()
win.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()