«

IP端口扫描+日志分析工具

‎刘小猪 发布于 阅读:18 教程


IP端口扫描+日志分析工具

import sys
import json
import csv
import socket
import ipaddress
import platform
import subprocess
import threading
import ssl
import struct
import re
import gzip
import pathlib
from urllib.parse import urlparse, parse_qs, unquote_plus
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Optional, Tuple, Dict, Any
from collections import Counter, defaultdict

from PyQt6.QtCore import Qt, QThread, pyqtSignal
from PyQt6.QtWidgets import (
    QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QLabel,
    QLineEdit, QPushButton, QSpinBox, QDoubleSpinBox, QCheckBox, QComboBox,
    QTableWidget, QTableWidgetItem, QFileDialog, QMessageBox, QTextEdit,
    QGroupBox, QProgressBar, QTabWidget, QListWidget, QListWidgetItem
)

# =============================
# 数据结构
# =============================
@dataclass
class ScanResult:
    host: str
    ip: str
    proto: str         # tcp/udp
    port: int
    state: str         # open / open|filtered
    service: str
    banner: str

@dataclass
class LogFinding:
    source: str          # 文件名/路径
    category: str        # 系统/中间件/数据库/应用/安全
    severity: str        # INFO/WARN/ERROR/CRITICAL
    timestamp: str
    keyword: str
    message: str
    attack_ip: str = ""
    attack_type: str = ""

# =============================
# 端口解析/目标解析
# =============================
def parse_ports(port_text: str) -> List[int]:
    """
    支持:
      - "1-65535"
      - "22,80,443"
      - "53,67-69,161"
    """
    port_text = (port_text or "").strip()
    if not port_text:
        return []
    ports = set()
    for part in port_text.split(","):
        part = part.strip()
        if not part:
            continue
        if "-" in part:
            a, b = part.split("-", 1)
            a = int(a.strip()); b = int(b.strip())
            if a > b:
                a, b = b, a
            for p in range(a, b + 1):
                if 1 <= p <= 65535:
                    ports.add(p)
        else:
            p = int(part)
            if 1 <= p <= 65535:
                ports.add(p)
    return sorted(ports)

def expand_targets(text: str) -> List[str]:
    """
    支持:
      - IPv4: 192.168.1.10
      - IPv6: 2001:db8::1
      - 域名: example.com
      - CIDR: 192.168.1.0/24, 2001:db8::/120
      - 多行/逗号分隔
    """
    text = (text or "").replace(",", "\n")
    items = [x.strip() for x in text.splitlines() if x.strip()]

    expanded = []
    for it in items:
        if "/" in it:
            try:
                net = ipaddress.ip_network(it, strict=False)
                for ip in net.hosts():
                    expanded.append(str(ip))
            except Exception:
                expanded.append(it)
        else:
            expanded.append(it)

    # 去重保序
    seen = set()
    out = []
    for x in expanded:
        if x not in seen:
            seen.add(x)
            out.append(x)
    return out

def resolve_host_to_ip(host: str) -> Tuple[str, str]:
    """
    返回 (显示host, 解析到的ip)
    - host可能是域名/IP
    - 解析优先返回第一个地址
    """
    try:
        # 如果本身就是IP
        ipaddress.ip_address(host)
        return host, host
    except Exception:
        pass

    try:
        infos = socket.getaddrinfo(host, None)
        # 优先 IPv4,然后 IPv6
        ipv4 = None
        ipv6 = None
        for family, _, _, _, sockaddr in infos:
            if family == socket.AF_INET:
                ipv4 = sockaddr[0]
            elif family == socket.AF_INET6:
                ipv6 = sockaddr[0]
        ip = ipv4 or ipv6
        if not ip:
            raise RuntimeError("无法解析IP")
        return host, ip
    except Exception:
        return host, ""

def is_domain(host: str) -> bool:
    try:
        ipaddress.ip_address(host)
        return False
    except Exception:
        return True

# =============================
# 探测与banner抓取
# =============================
SERVICE_GUESS = {
    21: "ftp",
    22: "ssh",
    23: "telnet",
    25: "smtp",
    53: "dns",
    80: "http",
    110: "pop3",
    123: "ntp",
    135: "msrpc",
    139: "netbios-ssn",
    143: "imap",
    161: "snmp",
    389: "ldap",
    443: "https",
    445: "smb",
    465: "smtps",
    587: "smtp-submission",
    993: "imaps",
    995: "pop3s",
    1433: "mssql",
    1521: "oracle",
    2049: "nfs",
    2375: "docker",
    3306: "mysql",
    3389: "rdp",
    5432: "postgres",
    5900: "vnc",
    6379: "redis",
    7001: "weblogic",
    8080: "http-alt",
    8443: "https-alt",
    9200: "elasticsearch",
    11211: "memcached",
    27017: "mongodb",
}

TLS_PORTS = {443, 8443, 9443, 993, 995, 465, 587, 990, 992, 994}

HTTP_PORTS = {80, 8080, 8000, 8888, 8081, 8181, 7001, 7100, 7501, 9090, 9200, 443, 8443, 9443}

def parse_http_title(body: str) -> str:
    m = re.search(r"<title[^>]*>(.*?)</title>", body, re.IGNORECASE | re.DOTALL)
    if not m:
        return ""
    title = re.sub(r"\s+", " ", m.group(1)).strip()
    return title[:80]

def tls_handshake_and_cert(ip: str, port: int, timeout: float, server_name: str = "") -> Tuple[bool, str]:
    """返回 (tls_ok, cert_info_snippet)"""
    family = socket.AF_INET6 if ":" in ip else socket.AF_INET
    ctx = ssl.create_default_context()
    ctx.check_hostname = False
    ctx.verify_mode = ssl.CERT_NONE

    cert_info = ""
    try:
        with socket.socket(family, socket.SOCK_STREAM) as sock:
            sock.settimeout(timeout)
            sock.connect((ip, port))
            with ctx.wrap_socket(sock, server_hostname=server_name or None) as ssock:
                ssock.settimeout(timeout)
                cert = ssock.getpeercert()
                if cert:
                    subject = cert.get("subject", [])
                    issuer = cert.get("issuer", [])
                    not_after = cert.get("notAfter", "")

                    def flatten(x):
                        out = []
                        for item in x:
                            for k, v in item:
                                out.append(f"{k}={v}")
                        return ", ".join(out)

                    cert_info = f"TLS OK; Subject: {flatten(subject)}; Issuer: {flatten(issuer)}; NotAfter: {not_after}"
                else:
                    cert_info = "TLS OK; no cert info"
                return True, sanitize_banner(cert_info)
    except Exception:
        return False, ""

def http_probe(ip: str, port: int, timeout: float, use_tls: bool, host_header: str = "localhost") -> str:
    """返回 HTTP 响应头 + title(如有)"""
    family = socket.AF_INET6 if ":" in ip else socket.AF_INET
    req = f"GET / HTTP/1.1\r\nHost: {host_header}\r\nUser-Agent: scanner\r\nConnection: close\r\n\r\n".encode()

    try:
        with socket.socket(family, socket.SOCK_STREAM) as sock:
            sock.settimeout(timeout)
            sock.connect((ip, port))

            if use_tls:
                ctx = ssl.create_default_context()
                ctx.check_hostname = False
                ctx.verify_mode = ssl.CERT_NONE
                sock = ctx.wrap_socket(sock, server_hostname=host_header)

            sock.settimeout(timeout)
            sock.sendall(req)

            data = b""
            while len(data) < 4096:
                chunk = sock.recv(1024)
                if not chunk:
                    break
                data += chunk

        text = safe_decode(data)
        if "\r\n\r\n" in text:
            header, body = text.split("\r\n\r\n", 1)
        else:
            header, body = text, ""

        title = parse_http_title(body)
        if title:
            return sanitize_banner(header + "\r\n<title>" + title + "</title>")
        return sanitize_banner(header)
    except Exception:
        return ""

def pg_probe(ip: str, port: int, timeout: float) -> str:
    """PostgreSQL SSLRequest 探测"""
    family = socket.AF_INET6 if ":" in ip else socket.AF_INET
    try:
        with socket.socket(family, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            s.connect((ip, port))
            payload = struct.pack("!II", 8, 80877103)
            s.sendall(payload)
            r = s.recv(1)
            if r in (b"S", b"N"):
                return "PostgreSQL detected (SSLRequest response: " + r.decode(errors="ignore") + ")"
    except Exception:
        pass
    return ""

def redis_probe(ip: str, port: int, timeout: float) -> str:
    family = socket.AF_INET6 if ":" in ip else socket.AF_INET
    try:
        with socket.socket(family, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            s.connect((ip, port))
            s.sendall(b"PING\r\n")
            data = s.recv(256)
            return sanitize_banner(safe_decode(data))
    except Exception:
        return ""

def memcached_probe(ip: str, port: int, timeout: float) -> str:
    family = socket.AF_INET6 if ":" in ip else socket.AF_INET
    try:
        with socket.socket(family, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            s.connect((ip, port))
            s.sendall(b"version\r\n")
            data = s.recv(256)
            return sanitize_banner(safe_decode(data))
    except Exception:
        return ""

def mysql_probe(ip: str, port: int, timeout: float) -> str:
    """MySQL 连接后通常直接返回握手包"""
    family = socket.AF_INET6 if ":" in ip else socket.AF_INET
    try:
        with socket.socket(family, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            s.connect((ip, port))
            data = s.recv(512)
            return sanitize_banner(safe_decode(data))
    except Exception:
        return ""

# 简化版 probes(模拟 nmap probes 思路:不同端口/协议发不同探测)
TCP_PROBES: List[Tuple[str, bytes]] = [
    ("HTTP", b"HEAD / HTTP/1.0\r\nHost: localhost\r\nUser-Agent: scanner\r\n\r\n"),
    ("HTTP-GET", b"GET / HTTP/1.0\r\nHost: localhost\r\nUser-Agent: scanner\r\n\r\n"),
    ("SMTP", b"EHLO example.com\r\n"),
    ("FTP", b"FEAT\r\n"),
    ("IMAP", b"A1 CAPABILITY\r\n"),
    ("POP3", b"QUIT\r\n"),
    ("REDIS", b"PING\r\n"),
    ("MEMCACHED", b"version\r\n"),
    ("MYSQL", b"\x00"),
    ("TELNET", b"\r\n"),
]

def safe_decode(b: bytes) -> str:
    try:
        return b.decode("utf-8", errors="replace")
    except Exception:
        return repr(b)

def sanitize_banner(s: str) -> str:
    if not s:
        return ""
    s = s.replace("\r", "\\r").replace("\n", "\\n")
    if len(s) > 400:
        s = s[:400] + "..."
    return s

# =============================
# 日志分析(离线文件分析)
# =============================
SEVERITY_KEYWORDS = {
    "CRITICAL": ["panic", "fatal", "critical", "segfault", "core dump", "out of memory", "oom", "kernel panic"],
    "ERROR": ["error", "failed", "exception", "traceback", "denied", "refused", "timeout", "unreachable"],
    "WARN": ["warn", "warning", "deprecated", "slow", "retry", "throttle", "too many"],
}

SECURITY_KEYWORDS = [
    "authentication failure", "invalid user", "failed password", "sshd", "sudo",
    "unauthorized", "sql injection", "xss", "csrf", "bruteforce", "brute force",
    "waf", "attack", "exploit", "malware", "ransom", "cve-"
]

ATTACK_PATTERNS = [
    ("SQL注入", [
        r"\bunion\s+select\b", r"\binformation_schema\b",
        r"\bor\s+1=1\b", r"\bsleep\s*\(", r"\bbenchmark\s*\(",
        r"\bextractvalue\s*\(", r"\bupdatexml\s*\(", r"\bload_file\s*\(",
        r"\binto\s+outfile\b", r"\bpg_sleep\s*\(", r"\bwaitfor\s+delay\b", r"\bxp_cmdshell\b"
    ]),
    ("XSS", [
        r"<script\b", r"javascript:", r"onerror\s*=", r"onload\s*=", r"document\.cookie",
        r"<img\b[^>]*onerror", r"<svg\b", r"alert\s*\(", r"prompt\s*\("
    ]),
    ("路径穿越", [
        r"\.\./", r"\.\.\\", r"%2e%2e%2f", r"%2e%2e%5c", r"/etc/passwd", r"boot\.ini"
    ]),
    ("命令注入", [
        r"\b;\s*cat\b", r"\b;\s*id\b", r"\b\|\s*whoami\b", r"\b\|\s*id\b",
        r"\b&&\s*id\b", r"\b&&\s*whoami\b", r"\b`[^`]+`", r"\$\([^\)]+\)"
    ]),
    ("SSRF", [
        r"169\.254\.169\.254", r"metadata\.google\.internal", r"\bfile://", r"\bgopher://",
        r"\bhttp://127\.0\.0\.1", r"\bhttp://localhost"
    ]),
    ("扫描/探测", [
        r"nmap", r"masscan", r"zmap", r"dirbuster", r"gobuster", r"nikto",
        r"\b/wp-admin\b", r"\b/.git/\b", r"\b\.env\b"
    ]),
    ("暴力破解", [
        r"failed password", r"invalid user", r"authentication failure", r"too many authentication failures",
        r"login failed", r"bad password", r"bruteforce", r"brute force"
    ]),
    ("WebShell/恶意文件", [
        r"c99\.php", r"r57\.php", r"shell\.php", r"webshell",
        r"base64_decode\(", r"\beval\s*\(", r"\bassert\s*\("
    ]),
]

ATTACK_REGEX = [(name, [re.compile(pat, re.IGNORECASE) for pat in pats]) for name, pats in ATTACK_PATTERNS]

def detect_attack_type(line: str) -> str:
    for name, regs in ATTACK_REGEX:
        for rg in regs:
            if rg.search(line):
                return name
    return ""

# =============================
# Access Log 结构化解析(Nginx/Apache)
# =============================
ACCESS_LOG_PATTERNS = [
    # Common Log Format / Combined
    re.compile(
        r'^(?P<ip>\S+)\s+\S+\s+\S+\s+\[(?P<time>[^\]]+)\]\s+"(?P<method>[A-Z]+)\s+(?P<uri>[^\s]+)\s+(?P<proto>[^"]+)"\s+(?P<status>\d{3})\s+(?P<size>\S+)(\s+"(?P<referer>[^"]*)"\s+"(?P<ua>[^"]*)")?'
    ),
]

SQLI_STRONG = [
    re.compile(p, re.IGNORECASE) for p in [
        r"\bunion\s+select\b",
        r"\binformation_schema\b",
        r"\bselect\b.+\bfrom\b",
        r"\b(or|and)\b\s+\d+=\d+",
        r"\b(or|and)\b\s+'[^']+'='[^']+'",
        r"\bsleep\s*\(",
        r"\bbenchmark\s*\(",
        r"\bextractvalue\s*\(",
        r"\bupdatexml\s*\(",
        r"\bpg_sleep\s*\(",
        r"\bwaitfor\s+delay\b",
        r"\bxp_cmdshell\b",
    ]
]

XSS_STRONG = [
    re.compile(p, re.IGNORECASE) for p in [
        r"<script\b",
        r"javascript:",
        r"onerror\s*=",
        r"onload\s*=",
        r"<svg\b",
        r"document\.cookie",
        r"alert\s*\(",
    ]
]

def parse_access_log_line(line: str) -> Optional[dict]:
    for rg in ACCESS_LOG_PATTERNS:
        m = rg.match(line)
        if m:
            d = m.groupdict()
            return {
                "ip": d.get("ip",""),
                "time": d.get("time",""),
                "method": d.get("method",""),
                "uri": d.get("uri",""),
                "status": d.get("status",""),
                "referer": d.get("referer","") or "",
                "ua": d.get("ua","") or "",
            }
    return None

def analyze_uri_for_attack(uri: str) -> Tuple[str, str]:
    """返回 (attack_type, evidence)"""
    try:
        uri = unquote_plus(uri)
    except Exception:
        pass

    parsed = urlparse(uri)
    qs = parse_qs(parsed.query)

    # 先用强规则匹配 query 参数
    for k, vals in qs.items():
        for v in vals:
            s = f"{k}={v}"
            for rg in SQLI_STRONG:
                if rg.search(s):
                    return "SQL注入", s[:120]
            for rg in XSS_STRONG:
                if rg.search(s):
                    return "XSS", s[:120]

    # 再对整个 URI 做弱匹配(兜底)
    for rg in SQLI_STRONG:
        if rg.search(uri):
            return "SQL注入", uri[:120]
    for rg in XSS_STRONG:
        if rg.search(uri):
            return "XSS", uri[:120]

    return "", ""

CRAWLER_UA_PATTERNS = [
    # 常见正规搜索引擎爬虫
    ("搜索引擎爬虫", [r"googlebot", r"bingbot", r"baiduspider", r"yandex(bot)?", r"sogou", r"360spider", r"petalbot", r"bytespider"]),
    # 常见商业爬虫/采集
    ("商业爬虫", [r"ahrefsbot", r"semrushbot", r"mj12bot", r"dotbot", r"serpstatbot"]),
    # 常见脚本/工具型爬取(更偏“灰”)
    ("脚本/工具爬虫", [r"python-requests", r"java/", r"go-http-client", r"curl/", r"wget/", r"libwww-perl", r"scrapy", r"httpclient", r"okhttp"]),
]

CRAWLER_REGEX = [(name, [re.compile(p, re.IGNORECASE) for p in pats]) for name, pats in CRAWLER_UA_PATTERNS]

SUSPICIOUS_PATH_PATTERNS = [
    re.compile(p, re.IGNORECASE) for p in [
        r"/wp-admin", r"/wp-login\.php", r"/xmlrpc\.php",
        r"/\.git/", r"/\.env\b", r"/phpmyadmin", r"/manager/html", r"/jenkins",
        r"/actuator", r"/swagger", r"/v2/_catalog", r"/api-docs",
        r"/etc/passwd", r"\.php\b", r"\.jsp\b", r"\.asp\b"
    ]
]

def detect_crawler(ua: str) -> Tuple[str, str]:
    """返回 (crawler_type, evidence)"""
    if not ua:
        return "", ""
    for name, regs in CRAWLER_REGEX:
        for rg in regs:
            if rg.search(ua):
                return name, rg.pattern
    # 兜底:包含 bot/spider/crawl
    if re.search(r"\b(bot|spider|crawl|crawler)\b", ua, re.IGNORECASE):
        return "未知爬虫", "bot/spider/crawl"
    return "", ""

def risk_score_access(ip: str, uri: str, status: str, ua: str) -> int:
    """非常简化的风险评分(用于 SIEM/蓝队视角汇总)"""
    score = 0
    try:
        st = int(status)
        if st in (401, 403):
            score += 2
        if st == 404:
            score += 1
        if st >= 500:
            score += 1
    except Exception:
        pass

    # 命中敏感路径
    for rg in SUSPICIOUS_PATH_PATTERNS:
        if rg.search(uri or ""):
            score += 3
            break

    # 工具型 UA 加权
    ctype, _ = detect_crawler(ua or "")
    if ctype == "脚本/工具爬虫":
        score += 3
    elif ctype in ("商业爬虫", "未知爬虫"):
        score += 2
    elif ctype == "搜索引擎爬虫":
        score += 0

    return score

MIDDLEWARE_HINTS = {
    "nginx": ["nginx", "upstream", "client prematurely closed", "connect() failed"],
    "apache": ["apache", "httpd", "mod_", "AH0"],
    "tomcat": ["catalina", "tomcat", "org.apache.catalina"],
    "weblogic": ["weblogic", "<BEA-", "<Error>", "<Warning>"],
    "redis": ["redis", "OOM command not allowed", "MISCONF", "Loading the dataset"],
}

DB_HINTS = {
    "mysql": ["mysqld", "innodb", "mysql", "error 1045", "error 2002"],
    "postgres": ["postgres", "FATAL:", "PANIC:", "could not connect", "connection refused"],
    "mongodb": ["mongod", "mongodb", "wiredtiger", "E NETWORK"],
}

APP_HINTS = {
    "java": ["exception", "at ", "Caused by:", "NullPointerException"],
    "python": ["Traceback (most recent call last)", "Exception:", "ERROR"],
    "nodejs": ["UnhandledPromiseRejectionWarning", "TypeError:", "ReferenceError:"],
}

TS_PATTERNS = [
    re.compile(r"(?P<ts>\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2})"),
    re.compile(r"(?P<ts>\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})"),  # syslog: Feb  2 12:30:10
]

def guess_category(line: str) -> str:
    low = line.lower()
    for name, hints in DB_HINTS.items():
        if any(h in low for h in hints):
            return "数据库"
    for name, hints in MIDDLEWARE_HINTS.items():
        if any(h in low for h in hints):
            return "中间件"
    for name, hints in APP_HINTS.items():
        if any(h.lower() in low for h in hints):
            return "应用"
    if any(k in low for k in SECURITY_KEYWORDS):
        return "安全"
    return "系统"

def extract_ts(line: str) -> str:
    for pat in TS_PATTERNS:
        m = pat.search(line)
        if m:
            return m.group("ts")
    return ""

def normalize_ts(ts: str) -> str:
    """统一转换为 YYYY-MM-DD HH:MM:SS,无法解析则返回原字符串"""
    ts = (ts or "").strip()
    if not ts:
        return ""
    # 1) already iso
    for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"]:
        try:
            dt = datetime.strptime(ts[:19], fmt)
            return dt.strftime("%Y-%m-%d %H:%M:%S")
        except Exception:
            pass
    # 2) syslog style: Feb  2 12:30:10
    try:
        dt = datetime.strptime(ts, "%b %d %H:%M:%S")
        dt = dt.replace(year=datetime.now().year)
        return dt.strftime("%Y-%m-%d %H:%M:%S")
    except Exception:
        pass
    # 3) nginx/apache: 02/Feb/2026:03:50:38 +0000
    try:
        dt = datetime.strptime(ts.split()[0], "%d/%b/%Y:%H:%M:%S")
        return dt.strftime("%Y-%m-%d %H:%M:%S")
    except Exception:
        pass
    return ts

# =============================
# IP 提取(用于日志分析 Attack IP 列)
# =============================
IPV4_RE = re.compile(r"\b(?:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\.){3}(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\b")
IPV6_RE = re.compile(r"\b(?:[0-9a-fA-F]{0,4}:){2,7}[0-9a-fA-F]{0,4}\b")

def extract_ip_from_line(line: str) -> str:
    """从日志行中尽量抽取一个 IP(IPv4/IPv6),并用 ipaddress 做二次校验避免把时间等误判成 IPv6。"""
    if not line:
        return ""

    # 先找 IPv4(更明确)
    for m in IPV4_RE.finditer(line):
        cand = m.group(0)
        try:
            ipaddress.ip_address(cand)
            return cand
        except Exception:
            continue

    # 再找 IPv6:regex 只负责“可能长得像”,最终以 ipaddress 校验为准
    for m in IPV6_RE.finditer(line):
        cand = m.group(0)
        # 过滤明显的时间戳形态(例如 15:36:49)
        if re.fullmatch(r"\d{1,2}:\d{1,2}:\d{1,2}", cand):
            continue
        try:
            ipaddress.ip_address(cand)
            return cand
        except Exception:
            continue

    return ""

def detect_severity(line: str) -> Tuple[str, str]:
    low = line.lower()
    for sev, kws in SEVERITY_KEYWORDS.items():
        for kw in kws:
            if kw in low:
                return sev, kw
    # security keywords as WARN/ERROR
    for kw in SECURITY_KEYWORDS:
        if kw in low:
            return "WARN", kw
    return "INFO", ""

def iter_log_lines(path: str, max_lines: int = 200000):
    # 支持 .gz
    if path.lower().endswith(".gz"):
        with gzip.open(path, "rt", encoding="utf-8", errors="replace") as f:
            for i, line in enumerate(f):
                if i >= max_lines:
                    break
                yield line.rstrip("\n")
    else:
        with open(path, "r", encoding="utf-8", errors="replace") as f:
            for i, line in enumerate(f):
                if i >= max_lines:
                    break
                yield line.rstrip("\n")

def analyze_log_file(path: str, max_findings: int = 5000) -> Tuple[List[LogFinding], Dict[str, Any]]:
    findings: List[LogFinding] = []
    counts = Counter()
    sev_counts = Counter()
    cat_counts = Counter()
    atk_counts = Counter()

    # Access Log 统计(SIEM视角)
    access_ip_counts = Counter()
    access_uri_counts = Counter()
    access_ua_counts = Counter()
    access_status_counts = Counter()
    risk_by_ip = Counter()

    for line in iter_log_lines(path):
        access = parse_access_log_line(line)
        if access:
            # 统计(所有 access 行都统计)
            ip_ = access.get('ip','')
            uri_ = access.get('uri','')
            ua_ = access.get('ua','')
            st_ = access.get('status','')
            access_ip_counts[ip_] += 1
            access_uri_counts[uri_] += 1
            if ua_:
                access_ua_counts[ua_] += 1
            if st_:
                access_status_counts[st_] += 1
            risk_by_ip[ip_] += risk_score_access(ip_, uri_, st_, ua_)

            # 1) 先做 SQLi/XSS 参数级分析
            atk, ev = analyze_uri_for_attack(uri_)
            if atk:
                # 强制告警
                sev = 'WARN'
                kw = ev or atk
                ts = access.get('time','')
                cat = '安全'
                findings.append(LogFinding(
                    source=path,
                    category=cat,
                    severity=sev,
                    timestamp=normalize_ts(ts),
                    keyword=kw,
                    message=f"{access.get('ip')} {access.get('method')} {access.get('uri')} status={access.get('status')} referer={access.get('referer')} ua={access.get('ua')}",
                    attack_ip=access.get('ip',''),
                    attack_type=atk
                ))
                counts[kw or sev] += 1
                sev_counts[sev] += 1
                cat_counts[cat] += 1
                atk_counts[atk] += 1
                if len(findings) >= max_findings:
                    break
                continue

            # 2) 再做爬虫识别(不一定是攻击,但可用于蓝队/运营画像)
            ctype, cev = detect_crawler(access.get('ua','') or '')
            if ctype:
                sev2 = 'INFO' if ctype == '搜索引擎爬虫' else 'WARN'
                ts = access.get('time','')
                cat = '爬虫'
                findings.append(LogFinding(
                    source=path,
                    category=cat,
                    severity=sev2,
                    timestamp=normalize_ts(ts),
                    keyword=cev,
                    message=f"{access.get('ip')} {access.get('method')} {access.get('uri')} status={access.get('status')} referer={access.get('referer')} ua={access.get('ua')}",
                    attack_ip=access.get('ip',''),
                    attack_type='爬虫'
                ))
                counts[cev or sev2] += 1
                sev_counts[sev2] += 1
                cat_counts[cat] += 1
                atk_counts['爬虫'] += 1
                if len(findings) >= max_findings:
                    break

        sev, kw = detect_severity(line)
        if sev == "INFO":
            continue
        ts = extract_ts(line)
        cat = guess_category(line)
        attack_type = detect_attack_type(line)
        if attack_type:
            cat = "安全"
            if sev == "INFO":
                sev = "WARN"

        findings.append(LogFinding(
            source=path,
            category=cat,
            severity=sev,
            timestamp=normalize_ts(ts),
            keyword=kw,
            message=line[:500],
            attack_ip=extract_ip_from_line(line),
            attack_type=attack_type
        ))
        counts[kw or sev] += 1
        sev_counts[sev] += 1
        cat_counts[cat] += 1
        if attack_type:
            atk_counts[attack_type] += 1

        if len(findings) >= max_findings:
            break

    stats = {
        "file": path,
        "total_findings": len(findings),
        "severity": dict(sev_counts),
        "category": dict(cat_counts),
        "top_keywords": counts.most_common(15),
        "top_attack_types": atk_counts.most_common(10),
        "access_top_ips": access_ip_counts.most_common(15),
        "access_top_uris": access_uri_counts.most_common(15),
        "access_top_uas": access_ua_counts.most_common(10),
        "access_status": access_status_counts.most_common(10),
        "risk_top_ips": risk_by_ip.most_common(15),
    }
    return findings, stats

def infer_service_from_banner(banner: str) -> str:
    if not banner:
        return ""
    lower = banner.lower()

    if "ssh-" in lower:
        return "ssh"
    if "http/" in lower or "server:" in lower or "set-cookie:" in lower:
        return "http"
    if "smtp" in lower or "esmtp" in lower:
        return "smtp"
    if "imap" in lower:
        return "imap"
    if "pop3" in lower:
        return "pop3"
    if "redis" in lower or lower.startswith("+pong"):
        return "redis"
    if "memcached" in lower:
        return "memcached"
    if "mysql" in lower:
        return "mysql"
    if "postgres" in lower:
        return "postgres"
    if "mongodb" in lower:
        return "mongodb"
    if "tls ok" in lower or "issuer=" in lower:
        return "tls"
    if "ftp" in lower:
        return "ftp"
    return ""
    lower = banner.lower()
    if "ssh-" in lower:
        return "ssh"
    if "http/" in lower or "server:" in lower:
        return "http"
    if "smtp" in lower or "esmtp" in lower:
        return "smtp"
    if "redis" in lower:
        return "redis"
    if "mysql" in lower:
        return "mysql"
    if "postgres" in lower:
        return "postgres"
    if "mongodb" in lower:
        return "mongodb"
    if "ftp" in lower:
        return "ftp"
    return ""

def tcp_connect(host: str, port: int, timeout: float) -> bool:
    family = socket.AF_INET6 if ":" in host else socket.AF_INET
    try:
        with socket.socket(family, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            return s.connect_ex((host, port)) == 0
    except Exception:
        return False

def banner_grab_tcp(host: str, port: int, timeout: float, sni_name: str = "") -> Tuple[str, str]:
    """更强 probes + TLS 探测"""
    family = socket.AF_INET6 if ":" in host else socket.AF_INET
    service = SERVICE_GUESS.get(port, "")
    banner = ""

    # 1) TLS 探测
    tls_ok = False
    tls_info = ""
    if port in TLS_PORTS:
        tls_ok, tls_info = tls_handshake_and_cert(host, port, timeout, server_name=sni_name)
        if tls_ok:
            if not service:
                service = "tls"
            if port in (443, 8443, 9443):
                service = "https"
            elif port == 993:
                service = "imaps"
            elif port == 995:
                service = "pop3s"
            elif port == 465:
                service = "smtps"
            elif port == 587:
                service = "smtp-submission"
            banner = tls_info

    # 2) HTTP 探测(header/title)
    if port in HTTP_PORTS:
        http_banner = http_probe(host, port, timeout, use_tls=tls_ok, host_header=(sni_name or "localhost"))
        if http_banner:
            banner = http_banner
            service = "https" if (tls_ok or port in (443, 8443, 9443)) else "http"

    # 3) 特定协议探测
    if port == 6379:
        b = redis_probe(host, port, timeout)
        if b:
            banner = b
            service = "redis"

    if port == 11211:
        b = memcached_probe(host, port, timeout)
        if b:
            banner = b
            service = "memcached"

    if port == 3306:
        b = mysql_probe(host, port, timeout)
        if b:
            banner = b
            service = "mysql"

    if port == 5432:
        b = pg_probe(host, port, timeout)
        if b:
            banner = sanitize_banner(b)
            service = "postgres"

    # 4) 被动 banner + 通用 probes 兜底
    if not banner:
        try:
            with socket.socket(family, socket.SOCK_STREAM) as s:
                s.settimeout(timeout)
                s.connect((host, port))

                try:
                    data = s.recv(512)
                    if data:
                        banner = safe_decode(data)
                except Exception:
                    pass

                if not banner:
                    probes = TCP_PROBES
                    if port in (80, 8080, 8000, 8443, 443):
                        probes = [TCP_PROBES[0], TCP_PROBES[1]] + TCP_PROBES[2:]

                    for _, payload in probes:
                        if payload:
                            try:
                                s.sendall(payload)
                            except Exception:
                                continue
                        try:
                            data2 = s.recv(512)
                            if data2:
                                banner = safe_decode(data2)
                                break
                        except Exception:
                            continue

        except Exception:
            pass

    banner = sanitize_banner(banner)

    if not service:
        service = infer_service_from_banner(banner)

    if tls_ok and not banner and tls_info:
        banner = tls_info

    return service, banner

def scan_tcp(host: str, ip: str, port: int, timeout: float, do_banner: bool) -> Optional[ScanResult]:
    if not tcp_connect(ip, port, timeout):
        return None

    service = SERVICE_GUESS.get(port, "")
    banner = ""
    if do_banner:
        service2, banner2 = banner_grab_tcp(ip, port, timeout, sni_name=(host if is_domain(host) else ""))
        if service2:
            service = service2
        banner = banner2

    return ScanResult(host=host, ip=ip, proto="tcp", port=port, state="open", service=service, banner=banner)

# =============================
# 可达性探测
# =============================
def ping_host(host: str, timeout_ms: int = 800) -> bool:
    system = platform.system().lower()
    try:
        if system == "windows":
            cmd = ["ping", "-n", "1", "-w", str(timeout_ms), host]
        else:
            cmd = ["ping", "-c", "1", "-W", "1", host]
        res = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=max(1, timeout_ms // 1000 + 1))
        return res.returncode == 0
    except Exception:
        return False

def tcp_reachable_probe(ip: str, timeout: float = 0.6, ports: Tuple[int, ...] = (443, 80, 22)) -> bool:
    for p in ports:
        try:
            family = socket.AF_INET6 if ":" in ip else socket.AF_INET
            with socket.socket(family, socket.SOCK_STREAM) as s:
                s.settimeout(timeout)
                r = s.connect_ex((ip, p))
                if r == 0 or r in (111, 61, 10061):
                    return True
        except Exception:
            continue
    return False

# =============================
# 扫描线程
# =============================
class ScannerThread(QThread):
    log = pyqtSignal(str)
    progress = pyqtSignal(int, int)  # done, total
    result = pyqtSignal(object)      # ScanResult
    finished_scan = pyqtSignal(dict) # stats

    def __init__(
        self,
        targets: List[str],
        tcp_ports: List[int],
        threads: int,
        timeout: float,
        do_banner: bool,
        skip_unreachable: bool,
        reach_method: str,  # "tcp" or "ping"
        parent=None
    ):
        super().__init__(parent)
        self.targets = targets
        self.tcp_ports = tcp_ports
        self.threads = max(1, threads)
        self.timeout = max(0.05, timeout)
        self.do_banner = do_banner
        self.skip_unreachable = skip_unreachable
        self.reach_method = reach_method
        self._stop = threading.Event()
        self._results: List[ScanResult] = []

    def stop(self):
        self._stop.set()

    @property
    def results(self) -> List[ScanResult]:
        return list(self._results)

    def run(self):
        from concurrent.futures import ThreadPoolExecutor, as_completed

        # 解析目标 -> (host, ip)
        resolved = []
        for host in self.targets:
            if self._stop.is_set():
                self.log.emit("已停止。")
                self.finished_scan.emit({})
                return

            h, ip = resolve_host_to_ip(host)
            if not ip:
                self.log.emit(f"[解析失败] {host}")
                continue
            resolved.append((h, ip))

        # 可达性探测
        live = []
        for h, ip in resolved:
            if self._stop.is_set():
                self.log.emit("已停止。")
                self.finished_scan.emit({})
                return

            if not self.skip_unreachable:
                live.append((h, ip))
                continue

            ok = ping_host(ip) if self.reach_method == "ping" else tcp_reachable_probe(ip, timeout=min(1.0, self.timeout))
            if ok:
                live.append((h, ip))
                self.log.emit(f"[主机在线] {h} ({ip})")
            else:
                self.log.emit(f"[跳过] {h} ({ip}) 不可达")

        # 构建任务
        tasks = []
        for h, ip in live:
            for p in self.tcp_ports:
                tasks.append(("tcp", h, ip, p))

        total = len(tasks)
        done = 0
        self.progress.emit(done, total)
        self.log.emit(f"开始扫描:目标 {len(live)} 台,任务 {total} 个")

        def do_task(proto: str, h: str, ip: str, port: int) -> Optional[ScanResult]:
            if self._stop.is_set():
                return None
            if proto == "tcp":
                return scan_tcp(h, ip, port, self.timeout, self.do_banner)

        try:
            with ThreadPoolExecutor(max_workers=self.threads) as ex:
                future_map = {ex.submit(do_task, proto, h, ip, port): (proto, h, ip, port) for proto, h, ip, port in tasks}
                for fut in as_completed(future_map):
                    if self._stop.is_set():
                        self.log.emit("用户请求停止。")
                        break
                    res = None
                    try:
                        res = fut.result()
                    except Exception:
                        res = None

                    done += 1
                    self.progress.emit(done, total)

                    if res:
                        self._results.append(res)
                        self.result.emit(res)

        except Exception as e:
            self.log.emit(f"[异常] {e}")

        stats = self.build_stats(self._results)
        self.finished_scan.emit(stats)

    def build_stats(self, results: List[ScanResult]) -> dict:
        per_host_open = defaultdict(int)
        svc_counter = Counter()
        proto_counter = Counter()

        for r in results:
            if r.state.startswith("open"):
                per_host_open[r.ip] += 1
                if r.service:
                    svc_counter[r.service] += 1
                proto_counter[r.proto] += 1

        top_services = svc_counter.most_common(10)
        return {
            "generated_at": datetime.now().isoformat(),
            "total_results": len(results),
            "per_host_open_ports": dict(per_host_open),
            "top_services": top_services,
            "proto_count": dict(proto_counter),
        }

# =============================
# UI
# =============================
class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("端口扫描器(PyQt6)- TCP + CIDR + Banner + 双栈 + 报告导出")
        self.resize(1280, 760)

        self.thread: Optional[ScannerThread] = None
        self.all_results: List[ScanResult] = []
        self.last_stats: Dict[str, Any] = {}

        root = QWidget()
        self.setCentralWidget(root)
        layout = QVBoxLayout(root)

        self.tabs = QTabWidget()
        layout.addWidget(self.tabs)

        scan_tab = QWidget()
        scan_layout = QVBoxLayout(scan_tab)
        self.tabs.addTab(scan_tab, "端口扫描")

        log_tab = QWidget()
        log_layout = QVBoxLayout(log_tab)
        self.tabs.addTab(log_tab, "日志分析")

        # 目标
        g_target = QGroupBox("目标(IP/域名/CIDR,支持多行或逗号分隔,支持IPv6)")
        gl = QVBoxLayout(g_target)
        self.targets_edit = QTextEdit()
        self.targets_edit.setPlaceholderText("例如:\n192.168.1.10\n192.168.1.0/24\n2001:db8::1\n2001:db8::/120\nexample.com")
        self.targets_edit.setFixedHeight(120)
        gl.addWidget(self.targets_edit)
        scan_layout.addWidget(g_target)

        # 参数
        g_opt = QGroupBox("扫描参数")
        opt = QHBoxLayout(g_opt)

        opt.addWidget(QLabel("TCP端口:"))
        self.tcp_ports_edit = QLineEdit("1-65535")  # 默认全端口
        self.tcp_ports_edit.setToolTip("默认全端口:1-65535;也支持 22,80,443 或 1-1024")
        opt.addWidget(self.tcp_ports_edit, 2)

        opt.addWidget(QLabel("线程数:"))
        self.threads_spin = QSpinBox()
        self.threads_spin.setRange(1, 3000)
        self.threads_spin.setValue(600)
        opt.addWidget(self.threads_spin)

        opt.addWidget(QLabel("超时(s):"))
        self.timeout_spin = QDoubleSpinBox()
        self.timeout_spin.setRange(0.05, 10.0)
        self.timeout_spin.setSingleStep(0.1)
        self.timeout_spin.setValue(0.6)
        opt.addWidget(self.timeout_spin)

        self.banner_check = QCheckBox("服务识别(Banner/Probe)")
        self.banner_check.setChecked(True)
        opt.addWidget(self.banner_check)

        self.skip_check = QCheckBox("跳过不可达主机")
        self.skip_check.setChecked(True)
        opt.addWidget(self.skip_check)

        opt.addWidget(QLabel("可达性探测:"))
        self.reach_combo = QComboBox()
        self.reach_combo.addItems(["TCP探测(推荐)", "Ping(ICMP)"])
        opt.addWidget(self.reach_combo)

        scan_layout.addWidget(g_opt)

        # 按钮
        btn_row = QHBoxLayout()
        self.start_btn = QPushButton("开始扫描")
        self.stop_btn = QPushButton("停止")
        self.stop_btn.setEnabled(False)

        self.export_json_btn = QPushButton("导出JSON(含统计)")
        self.export_csv_btn = QPushButton("导出CSV(含统计)")
        self.export_json_btn.setEnabled(False)
        self.export_csv_btn.setEnabled(False)

        btn_row.addWidget(self.start_btn)
        btn_row.addWidget(self.stop_btn)
        btn_row.addStretch(1)
        btn_row.addWidget(self.export_json_btn)
        btn_row.addWidget(self.export_csv_btn)
        scan_layout.addLayout(btn_row)

        # 进度条
        prog_row = QHBoxLayout()
        self.status_label = QLabel("状态:空闲")
        self.progress_label = QLabel("0/0")
        self.progress_bar = QProgressBar()
        self.progress_bar.setMinimum(0)
        self.progress_bar.setMaximum(100)
        self.progress_bar.setValue(0)
        prog_row.addWidget(self.status_label, 2)
        prog_row.addWidget(self.progress_bar, 4)
        prog_row.addWidget(self.progress_label, 1)
        scan_layout.addLayout(prog_row)

        # 表格
        self.table = QTableWidget(0, 7)
        self.table.setHorizontalHeaderLabels(["目标", "IP", "协议", "端口", "状态", "服务", "Banner/响应"])
        self.table.horizontalHeader().setStretchLastSection(True)
        #self.table.setSortingEnabled(True)
        scan_layout.addWidget(self.table, 5)

        # 日志
        self.log_box = QTextEdit()
        self.log_box.setReadOnly(True)
        self.log_box.setFixedHeight(160)
        scan_layout.addWidget(self.log_box)

        # =============================
        # 日志分析 Tab
        # =============================
        g_log = QGroupBox("导入日志文件/目录(支持 .log/.txt/.out/.err/.gz 等)")
        lg = QVBoxLayout(g_log)

        row1 = QHBoxLayout()
        self.log_path_edit = QLineEdit()
        self.log_path_edit.setPlaceholderText("选择日志文件或目录...")
        self.btn_pick_file = QPushButton("选择文件")
        self.btn_pick_dir = QPushButton("选择目录")
        row1.addWidget(self.log_path_edit, 4)
        row1.addWidget(self.btn_pick_file, 1)
        row1.addWidget(self.btn_pick_dir, 1)
        lg.addLayout(row1)

        row2 = QHBoxLayout()
        row2.addWidget(QLabel("最大读取行数/文件:"))
        self.max_lines_spin = QSpinBox()
        self.max_lines_spin.setRange(1000, 2000000)
        self.max_lines_spin.setValue(200000)
        row2.addWidget(self.max_lines_spin)

        row2.addWidget(QLabel("最大告警条数/文件:"))
        self.max_findings_spin = QSpinBox()
        self.max_findings_spin.setRange(100, 20000)
        self.max_findings_spin.setValue(5000)
        row2.addWidget(self.max_findings_spin)

        self.btn_analyze_logs = QPushButton("开始分析")
        row2.addWidget(self.btn_analyze_logs)
        row2.addStretch(1)
        lg.addLayout(row2)

        log_layout.addWidget(g_log)

        # 统计输出
        self.log_stats_box = QTextEdit()
        self.log_stats_box.setReadOnly(True)
        self.log_stats_box.setFixedHeight(160)
        log_layout.addWidget(self.log_stats_box)

        # 日志过滤
        g_filter = QGroupBox("日志过滤")
        fl = QHBoxLayout(g_filter)

        fl.addWidget(QLabel("严重性:"))
        self.filter_sev = QComboBox()
        self.filter_sev.addItems(["全部", "INFO", "WARN", "ERROR", "CRITICAL"])
        fl.addWidget(self.filter_sev)

        fl.addWidget(QLabel("类别:"))
        self.filter_cat = QComboBox()
        self.filter_cat.addItems(["全部", "系统", "中间件", "数据库", "应用", "安全", "爬虫"])
        fl.addWidget(self.filter_cat)

        fl.addWidget(QLabel("攻击类型:"))
        self.filter_atk = QComboBox()
        self.filter_atk.addItems(["全部", "SQL注入", "XSS", "路径穿越", "命令注入", "SSRF", "扫描/探测", "暴力破解", "WebShell/恶意文件", "爬虫"])
        fl.addWidget(self.filter_atk)

        fl.addWidget(QLabel("关键字:"))
        self.filter_kw = QLineEdit()
        self.filter_kw.setPlaceholderText("包含关键字(支持 IP/URI/UA/Referer)...")
        fl.addWidget(self.filter_kw, 3)

        self.btn_apply_filter = QPushButton("应用过滤")
        self.btn_reset_filter = QPushButton("重置")
        fl.addWidget(self.btn_apply_filter)
        fl.addWidget(self.btn_reset_filter)

        log_layout.addWidget(g_filter)

        # Findings 列表
        self.findings_table = QTableWidget(0, 8)
        self.findings_table.setHorizontalHeaderLabels(["严重性", "类别", "攻击类型", "攻击IP", "时间", "关键字", "来源", "内容"])
        self.findings_table.horizontalHeader().setStretchLastSection(True)
        log_layout.addWidget(self.findings_table, 5)

        # 导出按钮
        row3 = QHBoxLayout()
        self.btn_export_findings_json = QPushButton("导出发现(JSON)")
        self.btn_export_findings_csv = QPushButton("导出发现(CSV)")
        self.btn_export_findings_json.setEnabled(False)
        self.btn_export_findings_csv.setEnabled(False)
        row3.addStretch(1)
        row3.addWidget(self.btn_export_findings_json)
        row3.addWidget(self.btn_export_findings_csv)
        log_layout.addLayout(row3)

        self.log_findings: List[LogFinding] = []
        self.log_stats: List[dict] = []

        # signals
        self.start_btn.clicked.connect(self.on_start)
        self.stop_btn.clicked.connect(self.on_stop)
        self.export_json_btn.clicked.connect(self.on_export_json)
        self.export_csv_btn.clicked.connect(self.on_export_csv)

        # 日志分析 signals
        self.btn_pick_file.clicked.connect(self.pick_log_file)
        self.btn_pick_dir.clicked.connect(self.pick_log_dir)
        self.btn_analyze_logs.clicked.connect(self.analyze_logs)
        self.btn_export_findings_json.clicked.connect(self.export_findings_json)
        self.btn_export_findings_csv.clicked.connect(self.export_findings_csv)
        self.btn_apply_filter.clicked.connect(self.apply_log_filter)
        self.btn_reset_filter.clicked.connect(self.reset_log_filter)

    def append_log(self, msg: str):
        ts = datetime.now().strftime("%H:%M:%S")
        self.log_box.append(f"[{ts}] {msg}")

    def on_start(self):
        if self.thread and self.thread.isRunning():
            QMessageBox.warning(self, "提示", "扫描正在进行中。")
            return

        targets_raw = self.targets_edit.toPlainText().strip()
        if not targets_raw:
            QMessageBox.warning(self, "提示", "请输入至少一个目标。")
            return

        try:
            targets = expand_targets(targets_raw)
        except Exception as e:
            QMessageBox.critical(self, "错误", f"目标解析失败:{e}")
            return

        try:
            tcp_ports = parse_ports(self.tcp_ports_edit.text())
        except Exception as e:
            QMessageBox.critical(self, "错误", f"端口格式错误:{e}")
            return

        if not tcp_ports:
            QMessageBox.warning(self, "提示", "请填写 TCP")
            return

        # reset
        self.table.setRowCount(0)
        self.all_results = []
        self.last_stats = {}
        self.export_json_btn.setEnabled(False)
        self.export_csv_btn.setEnabled(False)

        threads = int(self.threads_spin.value())
        timeout = float(self.timeout_spin.value())
        do_banner = bool(self.banner_check.isChecked())
        skip_unreachable = bool(self.skip_check.isChecked())
        reach_method = "tcp" if self.reach_combo.currentIndex() == 0 else "ping"

        self.status_label.setText("状态:扫描中...")
        self.progress_label.setText("0/0")
        self.progress_bar.setValue(0)
        self.start_btn.setEnabled(False)
        self.stop_btn.setEnabled(True)

        self.append_log(f"开始:目标={len(targets)} TCP端口={len(tcp_ports)}")

        self.thread = ScannerThread(
            targets=targets,
            tcp_ports=tcp_ports,
            threads=threads,
            timeout=timeout,
            do_banner=do_banner,
            skip_unreachable=skip_unreachable,
            reach_method=reach_method
        )
        self.thread.log.connect(self.append_log)
        self.thread.progress.connect(self.on_progress)
        self.thread.result.connect(self.on_result)
        self.thread.finished_scan.connect(self.on_finished)
        self.thread.start()

    def on_stop(self):
        if self.thread and self.thread.isRunning():
            self.thread.stop()
            self.append_log("用户请求停止...")

    def on_progress(self, done: int, total: int):
        self.progress_label.setText(f"{done}/{total}")
        if total <= 0:
            self.progress_bar.setValue(0)
        else:
            self.progress_bar.setValue(int(done * 100 / total))

    def on_result(self, res: ScanResult):
        self.all_results.append(res)
        row = self.table.rowCount()
        self.table.insertRow(row)

        self.table.setItem(row, 0, QTableWidgetItem(res.host))
        self.table.setItem(row, 1, QTableWidgetItem(res.ip))
        self.table.setItem(row, 2, QTableWidgetItem(res.proto))
        self.table.setItem(row, 3, QTableWidgetItem(str(res.port)))
        self.table.setItem(row, 4, QTableWidgetItem(res.state))
        self.table.setItem(row, 5, QTableWidgetItem(res.service))
        self.table.setItem(row, 6, QTableWidgetItem(res.banner))

    def on_finished(self, stats: dict):
        self.last_stats = stats or {}
        self.status_label.setText("状态:完成")
        self.start_btn.setEnabled(True)
        self.stop_btn.setEnabled(False)

        if self.all_results:
            self.export_json_btn.setEnabled(True)
            self.export_csv_btn.setEnabled(True)
            self.append_log(f"完成:发现结果 {len(self.all_results)} 条")
        else:
            self.append_log("完成:未发现开放端口(或全部被过滤)")

        # if self.last_stats:
        #     self.append_log("=== 统计 ===")
        #     self.append_log(f"总结果数:{self.last_stats.get('total_results', 0)}")
        #     self.append_log(f"协议统计:{self.last_stats.get('proto_count', {})}")
        #     self.append_log(f"Top服务:{self.last_stats.get('top_services', [])}")

    def on_export_json(self):
        if not self.all_results:
            QMessageBox.information(self, "提示", "没有可导出的结果。")
            return

        path, _ = QFileDialog.getSaveFileName(self, "保存JSON", "scan_results.json", "JSON 文件 (*.json)")
        if not path:
            return

        payload = {
            "generated_at": datetime.now().isoformat(),
            "stats": self.last_stats,
            "results": [asdict(r) for r in self.all_results],
        }
        try:
            with open(path, "w", encoding="utf-8") as f:
                json.dump(payload, f, ensure_ascii=False, indent=2)
            self.append_log(f"已导出JSON:{path}")
        except Exception as e:
            QMessageBox.critical(self, "错误", f"导出失败:{e}")

    def on_export_csv(self):
        if not self.all_results:
            QMessageBox.information(self, "提示", "没有可导出的结果。")
            return

        path, _ = QFileDialog.getSaveFileName(self, "保存CSV", "scan_results.csv", "CSV 文件 (*.csv)")
        if not path:
            return

        try:
            with open(path, "w", newline="", encoding="utf-8") as f:
                w = csv.writer(f)
                # # 写统计头
                # w.writerow(["# generated_at", datetime.now().isoformat()])
                # w.writerow(["# total_results", self.last_stats.get("total_results", 0)])
                # w.writerow(["# proto_count", json.dumps(self.last_stats.get("proto_count", {}), ensure_ascii=False)])
                # w.writerow(["# top_services", json.dumps(self.last_stats.get("top_services", []), ensure_ascii=False)])
                # w.writerow([])

                # 写明细
                w.writerow(["host", "ip", "proto", "port", "state", "service", "banner"])
                for r in self.all_results:
                    w.writerow([r.host, r.ip, r.proto, r.port, r.state, r.service, r.banner])

            self.append_log(f"已导出CSV:{path}")
        except Exception as e:
            QMessageBox.critical(self, "错误", f"导出失败:{e}")

    # =============================
    # 日志分析功能
    # =============================
    def pick_log_file(self):
        path, _ = QFileDialog.getOpenFileName(self, "选择日志文件", "", "日志文件 (*.log *.txt *.out *.err *.gz);;所有文件 (*.*)")
        if path:
            self.log_path_edit.setText(path)

    def pick_log_dir(self):
        path = QFileDialog.getExistingDirectory(self, "选择日志目录")
        if path:
            self.log_path_edit.setText(path)

    def iter_log_paths(self, root_path: str) -> List[str]:
        p = pathlib.Path(root_path)
        if p.is_file():
            return [str(p)]
        if p.is_dir():
            exts = {".log", ".txt", ".out", ".err", ".gz"}
            files = []
            for fp in p.rglob("*"):
                if fp.is_file() and (fp.suffix.lower() in exts or fp.name.lower().endswith(".log.gz")):
                    files.append(str(fp))
            return files[:500]  # 防止目录过大
        return []

    def analyze_logs(self):
        root_path = self.log_path_edit.text().strip()
        if not root_path:
            QMessageBox.warning(self, "提示", "请先选择日志文件或目录。")
            return

        paths = self.iter_log_paths(root_path)
        if not paths:
            QMessageBox.warning(self, "提示", "未找到可分析的日志文件。")
            return

        self.log_findings = []
        self.log_stats = []
        self.findings_table.setRowCount(0)
        self.log_stats_box.clear()

        max_lines = int(self.max_lines_spin.value())
        max_findings = int(self.max_findings_spin.value())

        self.log_stats_box.append(f"开始分析:文件数={len(paths)}")
        QApplication.processEvents()

        for path in paths:
            try:
                # 覆盖 max_lines / max_findings
                global iter_log_lines
                # 临时包装 iter_log_lines 以传入 max_lines
                def _iter(path_in):
                    return iter_log_lines(path_in, max_lines=max_lines)

                findings, stats = analyze_log_file(path, max_findings=max_findings)
                self.log_stats.append(stats)
                self.log_findings.extend(findings)
            except Exception as e:
                self.log_stats_box.append(f"[失败] {path}: {e}")

        # 输出统计汇总
        total_files = len(self.log_stats)
        total_findings = len(self.log_findings)
        sev_sum = Counter()
        cat_sum = Counter()
        kw_sum = Counter()

        for st in self.log_stats:
            for k, v in st.get("severity", {}).items():
                sev_sum[k] += v
            for k, v in st.get("category", {}).items():
                cat_sum[k] += v
            for k, v in st.get("top_keywords", []):
                if k:
                    kw_sum[k] += v

        self.log_stats_box.append("")
        self.log_stats_box.append("=== 汇总统计 ===")
        self.log_stats_box.append(f"文件数:{total_files}")
        self.log_stats_box.append(f"告警条数:{total_findings}")
        self.log_stats_box.append(f"严重性统计:{dict(sev_sum)}")
        self.log_stats_box.append(f"类别统计:{dict(cat_sum)}")
        atk_sum = Counter()
        for st in self.log_stats:
            for k, v in st.get("top_attack_types", []):
                atk_sum[k] += v

        # self.log_stats_box.append(f"Top关键字:{kw_sum.most_common(10)}")
        # access_ip_sum = Counter()
        # access_uri_sum = Counter()
        # access_status_sum = Counter()
        # risk_ip_sum = Counter()
        # for st in self.log_stats:
        #     for k, v in st.get('access_top_ips', []):
        #         access_ip_sum[k] += v
        #     for k, v in st.get('access_top_uris', []):
        #         access_uri_sum[k] += v
        #     for k, v in st.get('access_status', []):
        #         access_status_sum[k] += v
        #     for k, v in st.get('risk_top_ips', []):
        #         risk_ip_sum[k] += v

        # self.log_stats_box.append(f"Top攻击类型:{atk_sum.most_common(10)}")
        # self.log_stats_box.append(f"Top访问IP:{access_ip_sum.most_common(10)}")
        # self.log_stats_box.append(f"Top访问URI:{access_uri_sum.most_common(10)}")
        # self.log_stats_box.append(f"状态码分布:{access_status_sum.most_common(10)}")
        # self.log_stats_box.append(f"Top风险IP:{risk_ip_sum.most_common(10)}")

        # 填充表格
        self.refresh_findings_table(self.log_findings)
        self.btn_export_findings_json.setEnabled(bool(self.log_findings))
        self.btn_export_findings_csv.setEnabled(bool(self.log_findings))

    def export_findings_json(self):
        if not self.log_findings:
            return
        path, _ = QFileDialog.getSaveFileName(self, "保存JSON", "log_findings.json", "JSON 文件 (*.json)")
        if not path:
            return
        payload = {
            "generated_at": datetime.now().isoformat(),
            "summary": self.log_stats,
            "findings": [asdict(x) for x in self.log_findings],
        }
        with open(path, "w", encoding="utf-8") as f:
            json.dump(payload, f, ensure_ascii=False, indent=2)
        QMessageBox.information(self, "完成", "已导出 JSON。")

    def export_findings_csv(self):
        if not self.log_findings:
            return
        path, _ = QFileDialog.getSaveFileName(self, "保存CSV", "log_findings.csv", "CSV 文件 (*.csv)")
        if not path:
            return
        with open(path, "w", newline="", encoding="utf-8") as f:
            w = csv.writer(f)
            w.writerow(["severity", "category", "attack_type", "attack_ip", "timestamp", "keyword", "source", "message"])
            for x in self.log_findings:
                w.writerow([x.severity, x.category, x.attack_type, getattr(x, "attack_ip", ""), x.timestamp, x.keyword, x.source, x.message])
        QMessageBox.information(self, "完成", "已导出 CSV。")

    def refresh_findings_table(self, findings: List[LogFinding]):
        self.findings_table.setRowCount(0)
        for fnd in findings[:20000]:
            row = self.findings_table.rowCount()
            self.findings_table.insertRow(row)

            # 8 columns:
            # 0 严重性, 1 类别, 2 攻击类型, 3 攻击IP, 4 时间, 5 关键字, 6 来源, 7 内容
            self.findings_table.setItem(row, 0, QTableWidgetItem(fnd.severity))
            self.findings_table.setItem(row, 1, QTableWidgetItem(fnd.category))
            self.findings_table.setItem(row, 2, QTableWidgetItem(fnd.attack_type))
            self.findings_table.setItem(row, 3, QTableWidgetItem(getattr(fnd, "attack_ip", "") or ""))
            self.findings_table.setItem(row, 4, QTableWidgetItem(fnd.timestamp))
            self.findings_table.setItem(row, 5, QTableWidgetItem(fnd.keyword))
            self.findings_table.setItem(row, 6, QTableWidgetItem(fnd.source))
            self.findings_table.setItem(row, 7, QTableWidgetItem(fnd.message))

    def apply_log_filter(self):
            if not self.log_findings:
                return
            sev = self.filter_sev.currentText()
            cat = self.filter_cat.currentText()
            atk = self.filter_atk.currentText()
            kw = self.filter_kw.text().strip().lower()

            out = []
            for f in self.log_findings:
                if sev != "全部" and f.severity != sev:
                    continue
                if cat != "全部" and f.category != cat:
                    continue
                if atk != "全部" and (f.attack_type or "") != atk:
                    continue
                if kw:
                    hay = " ".join([f.keyword or "", f.message or "", f.source or "", f.timestamp or "", getattr(f, "attack_ip", "") or ""]).lower()
                    if kw not in hay:
                        continue
                out.append(f)

            self.refresh_findings_table(out)
            self.log_stats_box.append(f"[过滤] 命中 {len(out)}/{len(self.log_findings)} 条")

    def reset_log_filter(self):
        self.filter_sev.setCurrentIndex(0)
        self.filter_cat.setCurrentIndex(0)
        self.filter_atk.setCurrentIndex(0)
        self.filter_kw.clear()
        self.refresh_findings_table(self.log_findings)

def main():
    app = QApplication(sys.argv)
    win = MainWindow()
    win.show()
    sys.exit(app.exec())

if __name__ == "__main__":
    main()

Python