<?xml version="1.0" encoding="utf-8"?>
<rss version="2.0"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:atom="http://www.w3.org/2005/Atom"
>
<channel>
<title><![CDATA[刘小猪--博客]]></title> 
<atom:link href="https://blog.liuxiaozhu.cn/rss.php" rel="self" type="application/rss+xml" />
<description><![CDATA[刘小猪--博客]]></description>
<link>https://blog.liuxiaozhu.cn/</link>
<language>zh-cn</language>
<generator>emlog</generator>

<item>
    <title>别只拿 AI 聊天了：普通人把 AI 变成“干活工具”的 3 个步骤</title>
    <link>https://blog.liuxiaozhu.cn/?post=14</link>
    <description><![CDATA[<h1>别只拿 AI 聊天了：普通人把 AI 变成“干活工具”的 3 个步骤</h1>
<p>很多人对 AI 的使用，还停留在一个阶段：</p>
<ul>
<li>问它一个问题</li>
<li>看它回一段话</li>
<li>觉得“还挺聪明”</li>
<li>然后就没有然后了</li>
</ul>
<p>于是 AI 变成了一个“高级聊天搭子”。</p>
<p>这并不是 AI 不够强，而是大多数人还没有把它从<strong>聊天工具</strong>，升级成<strong>干活工具</strong>。</p>
<p>这两者的差别非常大。</p>
<hr />
<h2>一、为什么很多人用了很久 AI，还是觉得“不太有用”？</h2>
<p>因为他用 AI 的方式，本质上还是：</p>
<blockquote>
<p>我问一句，它答一句。</p>
</blockquote>
<p>这种用法当然没问题，但它只能解决非常轻量的问题，比如：</p>
<ul>
<li>帮你润色一句话</li>
<li>解释一个概念</li>
<li>给一个简单建议</li>
<li>生成一段文案</li>
</ul>
<p>问题在于，真实生活和工作中的麻烦，往往不是“问一句就结束”的。</p>
<p>比如你真正想要的是：</p>
<ul>
<li>把一堆零散资料整理成一份文档</li>
<li>从多篇文章里提炼关键信息</li>
<li>根据会议记录输出待办事项</li>
<li>把复杂任务拆成步骤，并持续执行</li>
<li>批量处理重复工作</li>
</ul>
<p>这时候，如果你还只是把 AI 当聊天机器人用，就会很容易产生一种感觉：</p>
<p><strong>“它挺聪明，但好像也没帮我省多少事。”</strong></p>
<p>说白了，不是 AI 不行，而是你还没把它放到正确的位置上。</p>
<hr />
<h2>二、聊天工具和干活工具，区别到底在哪？</h2>
<p>一句话概括：</p>
<ul>
<li><strong>聊天工具型 AI</strong>：负责回答</li>
<li><strong>干活工具型 AI</strong>：负责推进任务</li>
</ul>
<p>前者解决的是“信息问答”；<br />
后者解决的是“任务完成”。</p>
<p>举个最直观的例子。</p>
<h3>场景 1：聊天工具用法</h3>
<p>你对 AI 说：</p>
<blockquote>
<p>帮我写一篇关于时间管理的文章</p>
</blockquote>
<p>它给你一篇。<br />
你看完觉得还行，但不够像你的风格。<br />
于是你继续改、继续问、继续补。</p>
<p>这还是“对话式辅助”。</p>
<h3>场景 2：干活工具用法</h3>
<p>你对 AI 说：</p>
<blockquote>
<p>我准备写一篇关于时间管理的公众号文章。目标读者是上班族，风格要通俗、有案例、有行动建议。请先帮我做 3 件事：<br />
1）给出 5 个选题角度<br />
2）从中选一个最适合公众号传播的<br />
3）输出标题、结构和正文初稿</p>
</blockquote>
<p>这时，AI 不再只是回答你一个问题，<br />
而是在帮你<strong>推进完整任务</strong>。</p>
<p>这就是区别。</p>
<hr />
<h2>三、普通人把 AI 变成“干活工具”，只要 3 个步骤</h2>
<p>很多人以为要学工作流、学自动化、学编程，才能真正用好 AI。</p>
<p>其实没那么复杂。</p>
<p>对于绝大多数普通人来说，只要先完成下面 3 步，就已经能把 AI 的使用效率拉开一大截。</p>
<hr />
<h1>第 1 步：别只提问题，要先说清楚“任务”</h1>
<p>这是最关键的一步。</p>
<p>很多人跟 AI 说话，像在搜索引擎里输关键词：</p>
<ul>
<li>“帮我总结一下”</li>
<li>“写个标题”</li>
<li>“这个怎么做”</li>
<li>“给我一篇文章”</li>
</ul>
<p>问题不是不能这么问，<br />
而是这种输入方式，天然会让 AI 输出非常泛。</p>
<p>因为它不知道你真正要完成什么。</p>
<p>所以更有效的方式不是“提问题”，而是<strong>下任务</strong>。</p>
<h2>错误示范</h2>
<blockquote>
<p>帮我写一篇文章</p>
</blockquote>
<h2>更好的说法</h2>
<blockquote>
<p>我要写一篇公众号文章，读者是刚开始接触 AI 的普通上班族，目标是让他们理解“AI 不只是聊天工具”，文章风格要通俗、少术语、有案例。请先给我 3 个选题方向，并推荐一个最适合传播的。</p>
</blockquote>
<p>你会发现，一旦任务更明确，AI 的输出质量会立刻提升。</p>
<h3>一个简单公式</h3>
<p>你可以记住这个万能模板：</p>
<blockquote>
<p><strong>我要做什么 + 给谁看 + 想达到什么效果 + 输出成什么形式</strong></p>
</blockquote>
<p>比如：</p>
<ul>
<li>我要做一个汇报 PPT，给老板看，希望突出重点，输出成 5 页提纲</li>
<li>我要写公众号文章，给新手读者看，希望通俗易懂，输出成标题+大纲+初稿</li>
<li>我要整理会议记录，给团队同步，希望清晰可执行，输出成待办清单</li>
</ul>
<p>当你开始这样用 AI，它就不再只是“陪你聊天”，而是开始理解你的工作目标。</p>
<hr />
<h1>第 2 步：把一个大问题，拆成 3 到 5 个小步骤</h1>
<p>AI 很强，但它也有一个很现实的问题：</p>
<p><strong>一次性让它处理太大的任务，往往容易跑偏。</strong></p>
<p>比如你直接说：</p>
<blockquote>
<p>帮我做一份完整的行业分析报告</p>
</blockquote>
<p>它大概率会给你一份看上去完整、实际上比较空泛的内容。</p>
<p>为什么？</p>
<p>因为任务太大、太抽象。</p>
<p>更有效的方法，是把大任务拆成小任务，让 AI 一步一步干。</p>
<h2>举个例子：做一篇公众号文章</h2>
<p>不要一上来就说：</p>
<blockquote>
<p>帮我写一篇爆款文章</p>
</blockquote>
<p>而是拆成：</p>
<h3>第一步：定选题</h3>
<blockquote>
<p>结合最近热榜，给我 5 个适合“AI科普及教程”账号的选题</p>
</blockquote>
<h3>第二步：定角度</h3>
<blockquote>
<p>从这 5 个里选一个最容易传播的，并说明理由</p>
</blockquote>
<h3>第三步：定结构</h3>
<blockquote>
<p>按“问题提出—原因解释—方法步骤—结尾总结”的结构列大纲</p>
</blockquote>
<h3>第四步：写初稿</h3>
<blockquote>
<p>按上面的结构写一篇 1500 字初稿，语言通俗一点</p>
</blockquote>
<h3>第五步：再优化</h3>
<blockquote>
<p>把开头改得更抓人，结尾加一个适合引导转发收藏的收束</p>
</blockquote>
<p>这样一来，AI 的表现通常会稳定很多。</p>
<p>因为它最擅长的，不一定是“一口气完成所有事情”，<br />
而是<strong>在清晰步骤下持续配合你完成事情</strong>。</p>
<p>这就是很多人忽略的一点：</p>
<blockquote>
<p>AI 的价值，不是替你一键完成全部工作，<br />
而是帮你把复杂工作拆解并加速推进。</p>
</blockquote>
<hr />
<h1>第 3 步：固定 3 个高频场景，让 AI 真正进入你的日常</h1>
<p>很多人用 AI 效果不好，还有一个原因：</p>
<p><strong>没有固定使用场景。</strong></p>
<p>今天拿它写文案，明天拿它翻译，后天又闲着不用。<br />
这种“偶尔想起来才用一下”的方式，很难形成真实效率提升。</p>
<p>更好的做法是：</p>
<blockquote>
<p>先固定 3 个你最常用、最容易重复的场景，把 AI 嵌进去。</p>
</blockquote>
<p>对于普通人，我最推荐这 3 类。</p>
<hr />
<h2>场景 1：信息整理</h2>
<p>适合人群：</p>
<ul>
<li>经常看资料、做笔记、读文章的人</li>
<li>自媒体、运营、产品、学生、研究者</li>
</ul>
<p>你可以让 AI 帮你：</p>
<ul>
<li>总结文章重点</li>
<li>提炼关键信息</li>
<li>对比多个观点</li>
<li>从一堆内容中提取行动建议</li>
</ul>
<h3>示例提示词</h3>
<blockquote>
<p>我会给你 3 段资料，请你帮我提炼出：<br />
1）核心观点<br />
2）共同点和差异点<br />
3）最适合写成公众号内容的 3 个角度</p>
</blockquote>
<p>这类工作原本特别耗脑力，<br />
但 AI 很适合做第一轮整理。</p>
<hr />
<h2>场景 2：写作辅助</h2>
<p>适合人群：</p>
<ul>
<li>写公众号</li>
<li>写短视频脚本</li>
<li>写汇报</li>
<li>写邮件</li>
<li>写方案</li>
</ul>
<p>你可以让 AI 帮你：</p>
<ul>
<li>想标题</li>
<li>列大纲</li>
<li>写初稿</li>
<li>改风格</li>
<li>缩短冗余内容</li>
<li>补充案例和过渡句</li>
</ul>
<p>重点不是“让 AI 替你写完”，<br />
而是让它承担最费时间的起步阶段。</p>
<p>很多人真正卡住的，不是写不出来，<br />
而是<strong>不知道从哪开始写</strong>。</p>
<p>AI 在这件事上特别有帮助。</p>
<hr />
<h2>场景 3：重复任务处理</h2>
<p>这类是最容易感受到“AI 真能干活”的地方。</p>
<p>比如：</p>
<ul>
<li>把会议记录整理成待办事项</li>
<li>把用户反馈分类</li>
<li>批量优化文案</li>
<li>把一堆零散想法整理成结构化内容</li>
<li>把长文本压缩成适合汇报的版本</li>
</ul>
<h3>示例提示词</h3>
<blockquote>
<p>下面是会议纪要，请帮我整理成：<br />
1）已确认事项<br />
2）待办事项<br />
3）责任人<br />
4）截止时间<br />
如果原文没写清楚，请标注“待确认”。</p>
</blockquote>
<p>这类任务的特点是：</p>
<ul>
<li>重复</li>
<li>枯燥</li>
<li>耗时间</li>
<li>但不一定需要你每次都亲自从头整理</li>
</ul>
<p>而这，恰恰是 AI 最应该发力的地方。</p>
<hr />
<h2>四、一个很多人没意识到的真相：AI 最值钱的，不是“聪明”，而是“省力”</h2>
<p>很多人评价 AI，第一反应是：</p>
<ul>
<li>它聪不聪明？</li>
<li>它回答得准不准？</li>
<li>它像不像真人？</li>
</ul>
<p>但从实际使用角度看，AI 最值钱的地方其实不是这些。</p>
<p>而是：</p>
<blockquote>
<p>它能不能帮你减少重复劳动，降低启动门槛，缩短完成任务的时间。</p>
</blockquote>
<p>你不需要它像天才，<br />
你需要它像一个<strong>可靠的助手</strong>。</p>
<p>比如：</p>
<ul>
<li>帮你把混乱的信息先理顺</li>
<li>帮你把空白页先填起来</li>
<li>帮你把任务往前推一步</li>
<li>帮你减少“我懒得开始”的心理负担</li>
</ul>
<p>很多时候，AI 真正创造的价值不是 100 分答案，<br />
而是让你从 0 分快速到 60 分，再由你自己拉到 90 分。</p>
<p>这才是最现实、也最强的用法。</p>
<hr />
<h2>五、普通人现在就能开始的最小行动</h2>
<p>如果你今天看完这篇文章，只记住一句话，我希望是：</p>
<blockquote>
<p><strong>别再只是问 AI 一个问题，而是给它一个任务。</strong></p>
</blockquote>
<p>你不用一上来就研究复杂工作流，<br />
也不用先学会自动化、脚本、Agent。</p>
<p>你只需要从今天开始，做 3 个改变：</p>
<h3>1）每次都先说清任务目标</h3>
<p>不是“帮我写”，而是“我要做什么，给谁看，达到什么效果”。</p>
<h3>2）把大任务拆成小步骤</h3>
<p>不要一口气让 AI 做完全部，分步骤推进。</p>
<h3>3）固定 3 个高频场景长期使用</h3>
<p>让 AI 真正进入你的写作、整理、复盘、汇报流程。</p>
<p>只要你做到这 3 点，<br />
AI 就会从“偶尔聊两句的工具”，<br />
变成你每天都能用上的“干活工具”。</p>
<hr />
<h2>结语</h2>
<p>AI 最大的误解之一，就是很多人把它当成“更聪明的搜索框”。</p>
<p>但它真正的潜力，不在于陪你聊天，<br />
而在于帮你把事情做完。</p>
<p>从聊天，到执行；<br />
从回答问题，到推进任务；<br />
这中间的差别，决定了你究竟是在“体验 AI”，还是在“用 AI 提效”。</p>
<p>所以，别只拿 AI 聊天了。</p>
<p>从今天开始，试着把它当成一个真正能帮你干活的助手。</p>
<p>你会发现，很多以前拖着不想做的事，终于能动起来了。</p>]]></description>
    <pubDate>Tue, 14 Apr 2026 15:41:35 +0800</pubDate>
    <dc:creator>‎刘小猪</dc:creator>
    <guid>https://blog.liuxiaozhu.cn/?post=14</guid>
</item>
<item>
    <title>OpenClaw + Skywork 技能套件：你的AI办公全家桶</title>
    <link>https://blog.liuxiaozhu.cn/?post=12</link>
    <description><![CDATA[<h1>🎯 OpenClaw + Skywork 技能套件：你的AI办公全家桶</h1>
<blockquote>
<p>还在为制作PPT、Excel报表、设计图片、写文档而发愁？OpenClaw 联合 Skywork 推出的五大技能插件，让你用自然语言就能完成专业级办公任务。</p>
</blockquote>
<hr />
<h2>📦 技能总览</h2>
<table>
<thead>
<tr>
<th>技能</th>
<th>核心定位</th>
<th>典型场景</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>skywork-ppt</strong></td>
<td>AI PPT生成与编辑</td>
<td>主题演讲、工作汇报、模板模仿</td>
</tr>
<tr>
<td><strong>skywork-design</strong></td>
<td>AI图像生成与编辑</td>
<td>海报、Logo、电商主图</td>
</tr>
<tr>
<td><strong>skywork-excel</strong></td>
<td>AI数据分析与报表</td>
<td>财务报表、销售分析、数据可视化</td>
</tr>
<tr>
<td><strong>skywork-document</strong></td>
<td>AI文档生成</td>
<td>报告、合同、简历、论文</td>
</tr>
<tr>
<td><strong>skywork-search</strong></td>
<td>AI实时搜索</td>
<td>行业调研、热点追踪、数据汇总</td>
</tr>
</tbody>
</table>
<hr />
<h2>1️⃣ skywork-ppt — AI演示文稿神器</h2>
<h3>🎯 核心定位</h3>
<p>一句话生成专业PPT，支持<strong>主题生成、模板模仿、编辑修改、本地操作</strong>四种模式。</p>
<h3>✨ 功能亮点</h3>
<ul>
<li>📝 <strong>主题生成</strong>：输入主题，自动生成完整PPT</li>
<li>🎨 <strong>模板模仿</strong>：上传现有PPT，AI学习风格生成新内容</li>
<li>✏️ <strong>智能编辑</strong>：自然语言修改页面、调整布局</li>
<li>🔧 <strong>本地操作</strong>：删除/重排/合并幻灯片（无需联网）</li>
</ul>
<h3>📥 安装步骤</h3>
<pre><code class="language-bash"># 方式一：ClawHub 安装（推荐）
clawhub install skywork-ppt

# 方式二：手动安装
git clone &lt;skywork-ppt-repo&gt; ~/.openclaw/workspace/skills/skywork-ppt</code></pre>
<hr />
<h2>2️⃣ skywork-design — AI图像创作</h2>
<h3>🎯 核心定位</h3>
<p>通过文字描述生成高质量图片，或对现有图片进行AI编辑。</p>
<h3>✨ 功能亮点</h3>
<ul>
<li>🖼️ <strong>文生图</strong>：文字描述→专业图片</li>
<li>🎭 <strong>图生图</strong>：参考图片→风格迁移/修改</li>
<li>📐 <strong>多尺寸支持</strong>：1:1、3:4、16:9、9:16等多种比例</li>
<li>🖥️ <strong>多分辨率</strong>：1K/2K/4K高清输出</li>
</ul>
<h3>📥 安装步骤</h3>
<pre><code class="language-bash"># 方式一：ClawHub 安装（推荐）
clawhub install skywork-design

# 方式二：手动安装
git clone &lt;skywork-design-repo&gt; ~/.openclaw/workspace/skills/skywork-design</code></pre>
<hr />
<h2>3️⃣ skywork-excel — AI数据分析专家</h2>
<h3>🎯 核心定位</h3>
<p>最强Excel AI助手，支持<strong>创建、分析、可视化</strong>一体化处理，内置实时网络搜索。</p>
<h3>✨ 功能亮点</h3>
<ul>
<li>📊 <strong>智能建表</strong>：一句话创建带公式、图表的专业报表</li>
<li>🔍 <strong>数据分析</strong>：自动分析Excel/CSV/PDF，生成洞察报告</li>
<li>📈 <strong>图表生成</strong>：自动匹配最佳可视化方案</li>
<li>🌐 <strong>实时搜索</strong>：内置搜索功能，获取股票、行情等实时数据</li>
</ul>
<h3>📥 安装步骤</h3>
<pre><code class="language-bash"># 方式一：ClawHub 安装（推荐）
clawhub install skywork-excel</code></pre>
<hr />
<h2>4️⃣ skywork-document — AI文档生成</h2>
<h3>🎯 核心定位</h3>
<p>专业文档生成引擎，支持 <strong>docx、pdf、html、markdown</strong> 多种格式。</p>
<h3>✨ 功能亮点</h3>
<ul>
<li>📄 <strong>全类型文档</strong>：报告、提案、博客、论文、合同、简历...</li>
<li>📑 <strong>参考写作</strong>：基于已有素材生成新文档</li>
<li>🌐 <strong>自动搜素</strong>：需要实时信息时自动联网获取</li>
<li>🎨 <strong>专业排版</strong>：输出即用的精美格式</li>
</ul>
<h3>📥 安装步骤</h3>
<pre><code class="language-bash"># 方式一：ClawHub 安装（推荐）
clawhub install skywork-document</code></pre>
<hr />
<h2>5️⃣ skywork-search — AI实时搜索</h2>
<h3>🎯 核心定位</h3>
<p>Skywork官方搜索API，一次调用最多支持3个查询，返回结构化结果。</p>
<h3>✨ 功能亮点</h3>
<ul>
<li>🔎 <strong>实时信息</strong>：获取最新行业资讯、数据统计</li>
<li>📊 <strong>结构化结果</strong>：返回标题、摘要、来源URL</li>
<li>⚡ <strong>高效批量</strong>：单次最多3个查询</li>
</ul>
<h3>📥 安装步骤</h3>
<pre><code class="language-bash"># 方式一：ClawHub 安装（推荐）
clawhub install skywork-search</code></pre>
<hr />
<h2>📌 统一命令汇总</h2>
<table>
<thead>
<tr>
<th>操作</th>
<th>命令</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>搜索技能</strong></td>
<td><code>clawhub search skywork</code></td>
</tr>
<tr>
<td><strong>安装技能</strong></td>
<td><code>clawhub install skywork-ppt</code></td>
</tr>
<tr>
<td><strong>查看已安装</strong></td>
<td><code>clawhub list</code></td>
</tr>
<tr>
<td><strong>更新技能</strong></td>
<td><code>clawhub update skywork-ppt</code></td>
</tr>
<tr>
<td><strong>更新全部</strong></td>
<td><code>clawhub update --all</code></td>
</tr>
</tbody>
</table>
<hr />
<h2>📖 总结</h2>
<p>这五个Skywork技能构成了完整的AI办公生态：</p>
<ul>
<li>📊 <strong>PPT + Excel + 文档</strong> → 覆盖所有办公文档需求</li>
<li>🎨 <strong>Design</strong> → 满足设计视觉需求</li>
<li>🔍 <strong>Search</strong> → 为所有技能提供实时数据支撑</li>
</ul>
<p>学会这五个技能，你的AI办公效率将提升10倍以上！</p>
<hr />
<p><em>本文部分功能需要Skywork会员支持，具体请访问官方平台了解。</em></p>]]></description>
    <pubDate>Tue, 24 Mar 2026 16:59:23 +0800</pubDate>
    <dc:creator>‎刘小猪</dc:creator>
    <guid>https://blog.liuxiaozhu.cn/?post=12</guid>
</item>
<item>
    <title>强烈推荐！OpenClaw 这13个超实用技能，让你的AI助手火力全开</title>
    <link>https://blog.liuxiaozhu.cn/?post=11</link>
    <description><![CDATA[<h1>还在抱怨AI助手不够强？那是没用对技能！今天给大家盘点OpenClaw的13个神级技能，每一个都相当实用，建议先收藏再看！</h1>
<h2>01、skill-creator</h2>
<p>创建和管理智能体技能<br />
这是给开发者用的神器！如果你想自己动手丰衣足食，创建专属的AI技能，选它就对了。<br />
优点：<br />
• 从零开始创建新技能<br />
• 优化和改进现有技能<br />
• 自动审核代码质量<br />
• 规范技能目录结构</p>
<h2>02、github</h2>
<p>GitHub深度集成<br />
程序员必备！直接在AI对话中操作GitHub。<br />
优点：<br />
• 随时查看Issue和PR状态<br />
• 提交代码和创建分支<br />
• 查看CI/CD运行结果<br />
• 用自然语言查询代码库</p>
<h2>03、weather</h2>
<p>天气查询小助手<br />
查天气还要打开APP？直接问AI就行！<br />
优点：<br />
• 支持全球城市天气查询<br />
• 获取未来天气预报<br />
• 无需API密钥，完全免费<br />
• 数据来源可靠</p>
<h2>04、brave-search</h2>
<p>Brave搜索API<br />
AI版百度/Google，关键时刻救大命！<br />
优点：<br />
• 快速搜索网页内容<br />
• 获取最新资讯<br />
• 无需浏览器即可抓取网页<br />
• 适合实时热点查询</p>
<h2>05、multi-search-engine</h2>
<p>多搜索引擎聚合<br />
一个技能同时调用17个搜索引擎！<br />
优点：<br />
• 8个中文引擎 + 9个国际引擎<br />
• 支持高级搜索语法<br />
• 时间筛选和站点搜索<br />
• 隐私搜索模式</p>
<h2>06、find-skills</h2>
<p>技能发现器<br />
不知道该用什么技能？问它！<br />
优点：<br />
• 智能推荐适合的技能<br />
• 帮你发现新技能<br />
• 类似&quot;如何做XXX&quot;的场景推荐</p>
<h2>07、healthcheck</h2>
<p>安全健康检查<br />
保护你的服务器不被hack！<br />
优点：<br />
• SSH和防火墙加固<br />
• 系统更新检查<br />
• 风险评估报告<br />
• 定时安全巡检</p>
<h2>08、mcporter</h2>
<p>MCP服务器管理器<br />
MCP是AI连接外部工具的桥梁，而这个技能帮你管理它们。<br />
优点：<br />
• 一键列出可用MCP服务器<br />
• 配置认证信息<br />
• 直接调用工具API<br />
• 支持HTTP和stdio模式</p>
<h2>09、openai-whisper</h2>
<p>本地语音转文字<br />
不用联网也能把语音变成文字！<br />
优点：<br />
• 完全本地运行，保护隐私<br />
• 无需API Key<br />
• 支持多语言<br />
• 离线转写能力</p>
<h2>10、node-connect</h2>
<p>节点连接诊断<br />
手机连不上AI终端？用这个技能排查问题！<br />
优点：<br />
• 诊断配对失败原因<br />
• 支持iOS/Android/macOS<br />
• 排查Tailscale网络问题<br />
• 解决各种连接异常</p>
<h2>11、clawhub</h2>
<p>技能商店<br />
官方技能商店，想装什么装什么！<br />
优点：<br />
• 搜索安装热门技能<br />
• 一键更新到最新版本<br />
• 发布自己的技能<br />
• 社区技能发现</p>
<h2>12、skill-vetting</h2>
<p>技能安全审核<br />
安装第三方技能前先验货！<br />
优点：<br />
• 检测恶意代码<br />
• 评估技能实用性<br />
• 安全风险提示<br />
• 帮你避坑</p>
<h2>13、browser</h2>
<p>无头浏览器自动化<br />
用AI直接&quot;看&quot;网页！<br />
优点：<br />
• 无需手动抓取网页内容<br />
• 自动渲染JavaScript页面<br />
• 可点击按钮、填写表单<br />
• 提取干净可读的正文内容<br />
• 适合抓取动态网页数据</p>
<h2>看完这篇文章，你知道该装什么技能了吗？</h2>
<h2>如果你也有好用的技能推荐，欢迎在评论区分享！</h2>]]></description>
    <pubDate>Mon, 23 Mar 2026 09:41:37 +0800</pubDate>
    <dc:creator>‎刘小猪</dc:creator>
    <guid>https://blog.liuxiaozhu.cn/?post=11</guid>
</item>
<item>
    <title>OpenClaw接入华为鸿蒙小艺</title>
    <link>https://blog.liuxiaozhu.cn/?post=10</link>
    <description><![CDATA[<h1>一、<a href="https://developer.huawei.com/consumer/cn/hag/hagindex.html#/" title="打开小艺开放平台">打开小艺开放平台</a>（<a href="https://developer.huawei.com/consumer/cn/hag/hagindex.html">https://developer.huawei.com/consumer/cn/hag/hagindex.html</a>#/） 点击立即接入会跳转到登录界面，登录后会进入首页，点击立即体验</h1>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/04371773300287.png" alt="" /><br />
<img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/c1eb1773300478.png" alt="" /></p>
<h3>进入到个人中心后，点击创建智能体</h3>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/44451773300588.png" alt="" /></p>
<h3>在弹出的窗口选择OpenClaw模式，然后输入相关的信息以及支持的设备</h3>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/71ed1773300671.png" alt="" /></p>
<h3>创建完成后会进入调试模式，点击新建凭证，会打开新的页面创建key</h3>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/41c71773300804.png" alt="" /><br />
<img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/dd8c1773300851.png" alt="" /></p>
<h3>将生成好的凭证复制保存下来</h3>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/af0a1773300891.png" alt="" /></p>
<h1>二、OpenClaw安装小艺技能</h1>
<pre><code>在openclaw机器执行openclaw plugins install @ynhcj/xiaoyi@latest</code></pre>
<h3>安装完成截图</h3>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/b9151773301052.png" alt="" /></p>
<h3>修改OpenClaw配置文件</h3>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/24cf1773301343.png" alt="" /></p>
<pre><code>"channels": {
    "xiaoyi": {
        "enabled": true,
        "ak": "小艺开放平台凭证ak",
        "sk": "小艺开放平台凭证sk",
        "agentId": "agent93fb8425bf264070b08e658d927f2bf9"
      }
  }</code></pre>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/5cbc1773301311.png" alt="" /></p>
<h3>修改完配置文件后，重启OpenClaw网关</h3>
<pre><code>docker compose restart openclaw-gateway</code></pre>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/990c1773301468.png" alt="" /></p>
<h3>OpenClaw网关重启完成后，再次进入小艺开放平台进行配置白名单</h3>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/69911773302214.png" alt="" /></p>
<h3>添加用户组以及添加用户</h3>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/41aa1773302259.png" alt="" /><br />
<img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/5bec1773302279.png" alt="" /></p>
<h3>添加完成后上架小艺Claw</h3>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/fea21773302315.png" alt="" /></p>
<h3>上架后等待审核通过就可以在手机、PC、平板的小艺技能中心看到对应的智能体了</h3>]]></description>
    <pubDate>Thu, 12 Mar 2026 15:21:51 +0800</pubDate>
    <dc:creator>‎刘小猪</dc:creator>
    <guid>https://blog.liuxiaozhu.cn/?post=10</guid>
</item>
<item>
    <title>OpenClaw接入企业微信智能机器人</title>
    <link>https://blog.liuxiaozhu.cn/?post=9</link>
    <description><![CDATA[<h1>第一步：以长连接方式创建智能机器人，获取Bot ID和Secret</h1>
<h2>1、可在客户端-工作台，点击-智能机器人-创建机器人，选择API模式创建。</h2>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/85d31773280266.png" alt="" /><br />
<img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/170a1773280293.png" alt="" /></p>
<h2>2、选择以「长连接」方式创建，并获取Bot ID 和 Secret</h2>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/96a61773280318.png" alt="" /></p>
<h1>第二步：关联企微机器人与OpenClaw</h1>
<h2>1、以本地部署为例，在本地终端，输入以下命令，安装企微插件。</h2>
<pre><code>openclaw plugins install @wecom/wecom-openclaw-plugin</code></pre>
<h2>2、安装成功提示如图。</h2>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/b2d21773280399.png" alt="" /></p>
<h2>3、重启OpenClaw。</h2>
<pre><code>openclaw gateway start</code></pre>
<h2>4、在终端中，输入以下命令，添加渠道。</h2>
<pre><code>openclaw channels add</code></pre>
<h2>5、在select channel步骤，选择 channel 为企业微信。</h2>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/39041773280551.png" alt="" /></p>
<h2>6、输入企业微信机器人Bot ID、Secret。</h2>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/27631773280580.png" alt="" /></p>
<h2>7、选择 finish。</h2>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/75bd1773280603.png" alt="" /></p>
<h2>8、选择配对方式，选择 Pairing。</h2>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/2b181773280627.png" alt="" /></p>
<h2>9、完成后续配置，并可看到配置渠道成功。</h2>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/408f1773280650.png" alt="" /></p>
<h2>10、在企业微信中，保存机器人，并跟他发消息。会收到一个配置密钥。</h2>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/6b9d1773280671.png" alt="" /></p>
<h2>11、复制此信息最后一行，并输入在终端中，完成配对。</h2>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/eb841773280697.png" alt="" /></p>
<h2>12、此时可在企业微信中正常对话。</h2>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/39ff1773280717.png" alt="" /></p>]]></description>
    <pubDate>Thu, 12 Mar 2026 09:49:37 +0800</pubDate>
    <dc:creator>‎刘小猪</dc:creator>
    <guid>https://blog.liuxiaozhu.cn/?post=9</guid>
</item>
<item>
    <title>Docker容器化部署安装OpenClaw</title>
    <link>https://blog.liuxiaozhu.cn/?post=8</link>
    <description><![CDATA[<h1>一、准备Docker模板（docker-compose.yml）</h1>
<pre><code>services:
  openclaw-gateway:
    image: openeuler/openclaw:latest
    environment:
      HOME: /home/node
      TERM: xterm-256color
    volumes:
      - /root/openclaw/config:/home/node/.openclaw
      - /root/openclaw/workspace:/home/node/.openclaw/workspace
    ports:
      - "18789:18789"
      - "18790:18790"
    init: true
    restart: unless-stopped
    command:
      [
        "gateway",
        "--bind",
        "lan",
        "--port",
        "18789",
        "--verbose"
      ]
    healthcheck:
      test:
        [
          "CMD",
          "sh",
          "-lc",
          "wget -qO- http://127.0.0.1:18789/healthz &gt;/dev/null 2&gt;&amp;1 || exit 1"
        ]
      interval: 30s
      timeout: 5s
      retries: 5
      start_period: 20s

  openclaw-cli:
    image: openeuler/openclaw:latest
    profiles: ["cli"]
    cap_drop:
      - NET_RAW
      - NET_ADMIN
    security_opt:
      - no-new-privileges:true
    environment:
      HOME: /home/node
      TERM: xterm-256color
      BROWSER: echo
      OPENCLAW_GATEWAY_URL: "ws://openclaw-gateway:18789"
    volumes:
      - /root/openclaw/config:/home/node/.openclaw
      - /root/openclaw/workspace:/home/node/.openclaw/workspace
    stdin_open: true
    tty: true
    init: true
    depends_on:
      - openclaw-gateway
    entrypoint: ["sh", "-lc"]
    command: ["sleep infinity"]</code></pre>
<h1>二、运行网关容器</h1>
<pre><code>docker compose up -d openclaw-gateway</code></pre>
<h1>三、运行客户端容器</h1>
<pre><code>docker compose --profile cli up -d openclaw-cli

##进入客户端容器
docker compose exec openclaw-cli sh

##初始化配置openclaw
openclaw onboard</code></pre>
<h1>四、修改网关配置文件（openclaw.json）</h1>
<h2>1、完成初始化配置后会生成一个配置文件openclaw.json，路径在宿主机目录映射下/root/openclaw/config/openclaw.json，需要修改该文件才能在其他设备上正常访问openclaw UI，打开文件找到&quot;gateway&quot;的配置内容，整段改成如下信息：</h2>
<pre><code>  "gateway": {
    "port": 18789,
    "mode": "local",
    "bind": "lan",
    "controlUi": {
      "allowedOrigins": [
        "http://容器所在的宿主机IP:18789"
      ],
      "allowInsecureAuth": true,
      "dangerouslyDisableDeviceAuth": true
    },
    "auth": {
      "mode": "token",
      "token": "123456"  #网关连接的认证秘钥
    },
    "tailscale": {
      "mode": "off",
      "resetOnExit": false
    },
    "nodes": {
      "denyCommands": [
        "camera.snap",
        "camera.clip",
        "screen.record",
        "contacts.add",
        "calendar.add",
        "reminders.add",
        "sms.send"
      ]
    }
  }</code></pre>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/c8c71773130258.png" alt="" /></p>
<h1>五、重启网关容器和客户端容器</h1>
<pre><code>docker compose restart openclaw-gateway
docker compose restart openclaw-cli</code></pre>
<h1>六、重启完成后，浏览器就可以正常访问openclaw服务了（<a href="http://容器宿主机IP:18789/#token=前面设置的网关秘钥">http://容器宿主机IP:18789/#token=前面设置的网关秘钥</a>）</h1>
<p><img src="https://blog.liuxiaozhu.cn/content/uploadfile/202603/fff41773130806.png" alt="" /></p>]]></description>
    <pubDate>Tue, 10 Mar 2026 15:49:37 +0800</pubDate>
    <dc:creator>‎刘小猪</dc:creator>
    <guid>https://blog.liuxiaozhu.cn/?post=8</guid>
</item>
<item>
    <title>IP端口扫描+日志分析工具</title>
    <link>https://blog.liuxiaozhu.cn/?post=7</link>
    <description><![CDATA[<h2>IP端口扫描+日志分析工具</h2>
<pre><code class="language-python">import sys
import json
import csv
import socket
import ipaddress
import platform
import subprocess
import threading
import ssl
import struct
import re
import gzip
import pathlib
from urllib.parse import urlparse, parse_qs, unquote_plus
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Optional, Tuple, Dict, Any
from collections import Counter, defaultdict

from PyQt6.QtCore import Qt, QThread, pyqtSignal
from PyQt6.QtWidgets import (
    QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QLabel,
    QLineEdit, QPushButton, QSpinBox, QDoubleSpinBox, QCheckBox, QComboBox,
    QTableWidget, QTableWidgetItem, QFileDialog, QMessageBox, QTextEdit,
    QGroupBox, QProgressBar, QTabWidget, QListWidget, QListWidgetItem
)

# =============================
# 数据结构
# =============================
@dataclass
class ScanResult:
    host: str
    ip: str
    proto: str         # tcp/udp
    port: int
    state: str         # open / open|filtered
    service: str
    banner: str

@dataclass
class LogFinding:
    source: str          # 文件名/路径
    category: str        # 系统/中间件/数据库/应用/安全
    severity: str        # INFO/WARN/ERROR/CRITICAL
    timestamp: str
    keyword: str
    message: str
    attack_ip: str = ""
    attack_type: str = ""

# =============================
# 端口解析/目标解析
# =============================
def parse_ports(port_text: str) -&gt; List[int]:
    """
    支持:
      - "1-65535"
      - "22,80,443"
      - "53,67-69,161"
    """
    port_text = (port_text or "").strip()
    if not port_text:
        return []
    ports = set()
    for part in port_text.split(","):
        part = part.strip()
        if not part:
            continue
        if "-" in part:
            a, b = part.split("-", 1)
            a = int(a.strip()); b = int(b.strip())
            if a &gt; b:
                a, b = b, a
            for p in range(a, b + 1):
                if 1 &lt;= p &lt;= 65535:
                    ports.add(p)
        else:
            p = int(part)
            if 1 &lt;= p &lt;= 65535:
                ports.add(p)
    return sorted(ports)

def expand_targets(text: str) -&gt; List[str]:
    """
    支持：
      - IPv4: 192.168.1.10
      - IPv6: 2001:db8::1
      - 域名: example.com
      - CIDR: 192.168.1.0/24, 2001:db8::/120
      - 多行/逗号分隔
    """
    text = (text or "").replace(",", "\n")
    items = [x.strip() for x in text.splitlines() if x.strip()]

    expanded = []
    for it in items:
        if "/" in it:
            try:
                net = ipaddress.ip_network(it, strict=False)
                for ip in net.hosts():
                    expanded.append(str(ip))
            except Exception:
                expanded.append(it)
        else:
            expanded.append(it)

    # 去重保序
    seen = set()
    out = []
    for x in expanded:
        if x not in seen:
            seen.add(x)
            out.append(x)
    return out

def resolve_host_to_ip(host: str) -&gt; Tuple[str, str]:
    """
    返回 (显示host, 解析到的ip)
    - host可能是域名/IP
    - 解析优先返回第一个地址
    """
    try:
        # 如果本身就是IP
        ipaddress.ip_address(host)
        return host, host
    except Exception:
        pass

    try:
        infos = socket.getaddrinfo(host, None)
        # 优先 IPv4，然后 IPv6
        ipv4 = None
        ipv6 = None
        for family, _, _, _, sockaddr in infos:
            if family == socket.AF_INET:
                ipv4 = sockaddr[0]
            elif family == socket.AF_INET6:
                ipv6 = sockaddr[0]
        ip = ipv4 or ipv6
        if not ip:
            raise RuntimeError("无法解析IP")
        return host, ip
    except Exception:
        return host, ""

def is_domain(host: str) -&gt; bool:
    try:
        ipaddress.ip_address(host)
        return False
    except Exception:
        return True

# =============================
# 探测与banner抓取
# =============================
SERVICE_GUESS = {
    21: "ftp",
    22: "ssh",
    23: "telnet",
    25: "smtp",
    53: "dns",
    80: "http",
    110: "pop3",
    123: "ntp",
    135: "msrpc",
    139: "netbios-ssn",
    143: "imap",
    161: "snmp",
    389: "ldap",
    443: "https",
    445: "smb",
    465: "smtps",
    587: "smtp-submission",
    993: "imaps",
    995: "pop3s",
    1433: "mssql",
    1521: "oracle",
    2049: "nfs",
    2375: "docker",
    3306: "mysql",
    3389: "rdp",
    5432: "postgres",
    5900: "vnc",
    6379: "redis",
    7001: "weblogic",
    8080: "http-alt",
    8443: "https-alt",
    9200: "elasticsearch",
    11211: "memcached",
    27017: "mongodb",
}

TLS_PORTS = {443, 8443, 9443, 993, 995, 465, 587, 990, 992, 994}

HTTP_PORTS = {80, 8080, 8000, 8888, 8081, 8181, 7001, 7100, 7501, 9090, 9200, 443, 8443, 9443}

def parse_http_title(body: str) -&gt; str:
    m = re.search(r"&lt;title[^&gt;]*&gt;(.*?)&lt;/title&gt;", body, re.IGNORECASE | re.DOTALL)
    if not m:
        return ""
    title = re.sub(r"\s+", " ", m.group(1)).strip()
    return title[:80]

def tls_handshake_and_cert(ip: str, port: int, timeout: float, server_name: str = "") -&gt; Tuple[bool, str]:
    """返回 (tls_ok, cert_info_snippet)"""
    family = socket.AF_INET6 if ":" in ip else socket.AF_INET
    ctx = ssl.create_default_context()
    ctx.check_hostname = False
    ctx.verify_mode = ssl.CERT_NONE

    cert_info = ""
    try:
        with socket.socket(family, socket.SOCK_STREAM) as sock:
            sock.settimeout(timeout)
            sock.connect((ip, port))
            with ctx.wrap_socket(sock, server_hostname=server_name or None) as ssock:
                ssock.settimeout(timeout)
                cert = ssock.getpeercert()
                if cert:
                    subject = cert.get("subject", [])
                    issuer = cert.get("issuer", [])
                    not_after = cert.get("notAfter", "")

                    def flatten(x):
                        out = []
                        for item in x:
                            for k, v in item:
                                out.append(f"{k}={v}")
                        return ", ".join(out)

                    cert_info = f"TLS OK; Subject: {flatten(subject)}; Issuer: {flatten(issuer)}; NotAfter: {not_after}"
                else:
                    cert_info = "TLS OK; no cert info"
                return True, sanitize_banner(cert_info)
    except Exception:
        return False, ""

def http_probe(ip: str, port: int, timeout: float, use_tls: bool, host_header: str = "localhost") -&gt; str:
    """返回 HTTP 响应头 + title（如有）"""
    family = socket.AF_INET6 if ":" in ip else socket.AF_INET
    req = f"GET / HTTP/1.1\r\nHost: {host_header}\r\nUser-Agent: scanner\r\nConnection: close\r\n\r\n".encode()

    try:
        with socket.socket(family, socket.SOCK_STREAM) as sock:
            sock.settimeout(timeout)
            sock.connect((ip, port))

            if use_tls:
                ctx = ssl.create_default_context()
                ctx.check_hostname = False
                ctx.verify_mode = ssl.CERT_NONE
                sock = ctx.wrap_socket(sock, server_hostname=host_header)

            sock.settimeout(timeout)
            sock.sendall(req)

            data = b""
            while len(data) &lt; 4096:
                chunk = sock.recv(1024)
                if not chunk:
                    break
                data += chunk

        text = safe_decode(data)
        if "\r\n\r\n" in text:
            header, body = text.split("\r\n\r\n", 1)
        else:
            header, body = text, ""

        title = parse_http_title(body)
        if title:
            return sanitize_banner(header + "\r\n&lt;title&gt;" + title + "&lt;/title&gt;")
        return sanitize_banner(header)
    except Exception:
        return ""

def pg_probe(ip: str, port: int, timeout: float) -&gt; str:
    """PostgreSQL SSLRequest 探测"""
    family = socket.AF_INET6 if ":" in ip else socket.AF_INET
    try:
        with socket.socket(family, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            s.connect((ip, port))
            payload = struct.pack("!II", 8, 80877103)
            s.sendall(payload)
            r = s.recv(1)
            if r in (b"S", b"N"):
                return "PostgreSQL detected (SSLRequest response: " + r.decode(errors="ignore") + ")"
    except Exception:
        pass
    return ""

def redis_probe(ip: str, port: int, timeout: float) -&gt; str:
    family = socket.AF_INET6 if ":" in ip else socket.AF_INET
    try:
        with socket.socket(family, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            s.connect((ip, port))
            s.sendall(b"PING\r\n")
            data = s.recv(256)
            return sanitize_banner(safe_decode(data))
    except Exception:
        return ""

def memcached_probe(ip: str, port: int, timeout: float) -&gt; str:
    family = socket.AF_INET6 if ":" in ip else socket.AF_INET
    try:
        with socket.socket(family, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            s.connect((ip, port))
            s.sendall(b"version\r\n")
            data = s.recv(256)
            return sanitize_banner(safe_decode(data))
    except Exception:
        return ""

def mysql_probe(ip: str, port: int, timeout: float) -&gt; str:
    """MySQL 连接后通常直接返回握手包"""
    family = socket.AF_INET6 if ":" in ip else socket.AF_INET
    try:
        with socket.socket(family, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            s.connect((ip, port))
            data = s.recv(512)
            return sanitize_banner(safe_decode(data))
    except Exception:
        return ""

# 简化版 probes（模拟 nmap probes 思路：不同端口/协议发不同探测）
TCP_PROBES: List[Tuple[str, bytes]] = [
    ("HTTP", b"HEAD / HTTP/1.0\r\nHost: localhost\r\nUser-Agent: scanner\r\n\r\n"),
    ("HTTP-GET", b"GET / HTTP/1.0\r\nHost: localhost\r\nUser-Agent: scanner\r\n\r\n"),
    ("SMTP", b"EHLO example.com\r\n"),
    ("FTP", b"FEAT\r\n"),
    ("IMAP", b"A1 CAPABILITY\r\n"),
    ("POP3", b"QUIT\r\n"),
    ("REDIS", b"PING\r\n"),
    ("MEMCACHED", b"version\r\n"),
    ("MYSQL", b"\x00"),
    ("TELNET", b"\r\n"),
]

def safe_decode(b: bytes) -&gt; str:
    try:
        return b.decode("utf-8", errors="replace")
    except Exception:
        return repr(b)

def sanitize_banner(s: str) -&gt; str:
    if not s:
        return ""
    s = s.replace("\r", "\\r").replace("\n", "\\n")
    if len(s) &gt; 400:
        s = s[:400] + "..."
    return s

# =============================
# 日志分析（离线文件分析）
# =============================
SEVERITY_KEYWORDS = {
    "CRITICAL": ["panic", "fatal", "critical", "segfault", "core dump", "out of memory", "oom", "kernel panic"],
    "ERROR": ["error", "failed", "exception", "traceback", "denied", "refused", "timeout", "unreachable"],
    "WARN": ["warn", "warning", "deprecated", "slow", "retry", "throttle", "too many"],
}

SECURITY_KEYWORDS = [
    "authentication failure", "invalid user", "failed password", "sshd", "sudo",
    "unauthorized", "sql injection", "xss", "csrf", "bruteforce", "brute force",
    "waf", "attack", "exploit", "malware", "ransom", "cve-"
]

ATTACK_PATTERNS = [
    ("SQL注入", [
        r"\bunion\s+select\b", r"\binformation_schema\b",
        r"\bor\s+1=1\b", r"\bsleep\s*\(", r"\bbenchmark\s*\(",
        r"\bextractvalue\s*\(", r"\bupdatexml\s*\(", r"\bload_file\s*\(",
        r"\binto\s+outfile\b", r"\bpg_sleep\s*\(", r"\bwaitfor\s+delay\b", r"\bxp_cmdshell\b"
    ]),
    ("XSS", [
        r"&lt;script\b", r"javascript:", r"onerror\s*=", r"onload\s*=", r"document\.cookie",
        r"&lt;img\b[^&gt;]*onerror", r"&lt;svg\b", r"alert\s*\(", r"prompt\s*\("
    ]),
    ("路径穿越", [
        r"\.\./", r"\.\.\\", r"%2e%2e%2f", r"%2e%2e%5c", r"/etc/passwd", r"boot\.ini"
    ]),
    ("命令注入", [
        r"\b;\s*cat\b", r"\b;\s*id\b", r"\b\|\s*whoami\b", r"\b\|\s*id\b",
        r"\b&amp;&amp;\s*id\b", r"\b&amp;&amp;\s*whoami\b", r"\b`[^`]+`", r"\$\([^\)]+\)"
    ]),
    ("SSRF", [
        r"169\.254\.169\.254", r"metadata\.google\.internal", r"\bfile://", r"\bgopher://",
        r"\bhttp://127\.0\.0\.1", r"\bhttp://localhost"
    ]),
    ("扫描/探测", [
        r"nmap", r"masscan", r"zmap", r"dirbuster", r"gobuster", r"nikto",
        r"\b/wp-admin\b", r"\b/.git/\b", r"\b\.env\b"
    ]),
    ("暴力破解", [
        r"failed password", r"invalid user", r"authentication failure", r"too many authentication failures",
        r"login failed", r"bad password", r"bruteforce", r"brute force"
    ]),
    ("WebShell/恶意文件", [
        r"c99\.php", r"r57\.php", r"shell\.php", r"webshell",
        r"base64_decode\(", r"\beval\s*\(", r"\bassert\s*\("
    ]),
]

ATTACK_REGEX = [(name, [re.compile(pat, re.IGNORECASE) for pat in pats]) for name, pats in ATTACK_PATTERNS]

def detect_attack_type(line: str) -&gt; str:
    for name, regs in ATTACK_REGEX:
        for rg in regs:
            if rg.search(line):
                return name
    return ""

# =============================
# Access Log 结构化解析（Nginx/Apache）
# =============================
ACCESS_LOG_PATTERNS = [
    # Common Log Format / Combined
    re.compile(
        r'^(?P&lt;ip&gt;\S+)\s+\S+\s+\S+\s+\[(?P&lt;time&gt;[^\]]+)\]\s+"(?P&lt;method&gt;[A-Z]+)\s+(?P&lt;uri&gt;[^\s]+)\s+(?P&lt;proto&gt;[^"]+)"\s+(?P&lt;status&gt;\d{3})\s+(?P&lt;size&gt;\S+)(\s+"(?P&lt;referer&gt;[^"]*)"\s+"(?P&lt;ua&gt;[^"]*)")?'
    ),
]

SQLI_STRONG = [
    re.compile(p, re.IGNORECASE) for p in [
        r"\bunion\s+select\b",
        r"\binformation_schema\b",
        r"\bselect\b.+\bfrom\b",
        r"\b(or|and)\b\s+\d+=\d+",
        r"\b(or|and)\b\s+'[^']+'='[^']+'",
        r"\bsleep\s*\(",
        r"\bbenchmark\s*\(",
        r"\bextractvalue\s*\(",
        r"\bupdatexml\s*\(",
        r"\bpg_sleep\s*\(",
        r"\bwaitfor\s+delay\b",
        r"\bxp_cmdshell\b",
    ]
]

XSS_STRONG = [
    re.compile(p, re.IGNORECASE) for p in [
        r"&lt;script\b",
        r"javascript:",
        r"onerror\s*=",
        r"onload\s*=",
        r"&lt;svg\b",
        r"document\.cookie",
        r"alert\s*\(",
    ]
]

def parse_access_log_line(line: str) -&gt; Optional[dict]:
    for rg in ACCESS_LOG_PATTERNS:
        m = rg.match(line)
        if m:
            d = m.groupdict()
            return {
                "ip": d.get("ip",""),
                "time": d.get("time",""),
                "method": d.get("method",""),
                "uri": d.get("uri",""),
                "status": d.get("status",""),
                "referer": d.get("referer","") or "",
                "ua": d.get("ua","") or "",
            }
    return None

def analyze_uri_for_attack(uri: str) -&gt; Tuple[str, str]:
    """返回 (attack_type, evidence)"""
    try:
        uri = unquote_plus(uri)
    except Exception:
        pass

    parsed = urlparse(uri)
    qs = parse_qs(parsed.query)

    # 先用强规则匹配 query 参数
    for k, vals in qs.items():
        for v in vals:
            s = f"{k}={v}"
            for rg in SQLI_STRONG:
                if rg.search(s):
                    return "SQL注入", s[:120]
            for rg in XSS_STRONG:
                if rg.search(s):
                    return "XSS", s[:120]

    # 再对整个 URI 做弱匹配（兜底）
    for rg in SQLI_STRONG:
        if rg.search(uri):
            return "SQL注入", uri[:120]
    for rg in XSS_STRONG:
        if rg.search(uri):
            return "XSS", uri[:120]

    return "", ""

CRAWLER_UA_PATTERNS = [
    # 常见正规搜索引擎爬虫
    ("搜索引擎爬虫", [r"googlebot", r"bingbot", r"baiduspider", r"yandex(bot)?", r"sogou", r"360spider", r"petalbot", r"bytespider"]),
    # 常见商业爬虫/采集
    ("商业爬虫", [r"ahrefsbot", r"semrushbot", r"mj12bot", r"dotbot", r"serpstatbot"]),
    # 常见脚本/工具型爬取（更偏“灰”）
    ("脚本/工具爬虫", [r"python-requests", r"java/", r"go-http-client", r"curl/", r"wget/", r"libwww-perl", r"scrapy", r"httpclient", r"okhttp"]),
]

CRAWLER_REGEX = [(name, [re.compile(p, re.IGNORECASE) for p in pats]) for name, pats in CRAWLER_UA_PATTERNS]

SUSPICIOUS_PATH_PATTERNS = [
    re.compile(p, re.IGNORECASE) for p in [
        r"/wp-admin", r"/wp-login\.php", r"/xmlrpc\.php",
        r"/\.git/", r"/\.env\b", r"/phpmyadmin", r"/manager/html", r"/jenkins",
        r"/actuator", r"/swagger", r"/v2/_catalog", r"/api-docs",
        r"/etc/passwd", r"\.php\b", r"\.jsp\b", r"\.asp\b"
    ]
]

def detect_crawler(ua: str) -&gt; Tuple[str, str]:
    """返回 (crawler_type, evidence)"""
    if not ua:
        return "", ""
    for name, regs in CRAWLER_REGEX:
        for rg in regs:
            if rg.search(ua):
                return name, rg.pattern
    # 兜底：包含 bot/spider/crawl
    if re.search(r"\b(bot|spider|crawl|crawler)\b", ua, re.IGNORECASE):
        return "未知爬虫", "bot/spider/crawl"
    return "", ""

def risk_score_access(ip: str, uri: str, status: str, ua: str) -&gt; int:
    """非常简化的风险评分（用于 SIEM/蓝队视角汇总）"""
    score = 0
    try:
        st = int(status)
        if st in (401, 403):
            score += 2
        if st == 404:
            score += 1
        if st &gt;= 500:
            score += 1
    except Exception:
        pass

    # 命中敏感路径
    for rg in SUSPICIOUS_PATH_PATTERNS:
        if rg.search(uri or ""):
            score += 3
            break

    # 工具型 UA 加权
    ctype, _ = detect_crawler(ua or "")
    if ctype == "脚本/工具爬虫":
        score += 3
    elif ctype in ("商业爬虫", "未知爬虫"):
        score += 2
    elif ctype == "搜索引擎爬虫":
        score += 0

    return score

MIDDLEWARE_HINTS = {
    "nginx": ["nginx", "upstream", "client prematurely closed", "connect() failed"],
    "apache": ["apache", "httpd", "mod_", "AH0"],
    "tomcat": ["catalina", "tomcat", "org.apache.catalina"],
    "weblogic": ["weblogic", "&lt;BEA-", "&lt;Error&gt;", "&lt;Warning&gt;"],
    "redis": ["redis", "OOM command not allowed", "MISCONF", "Loading the dataset"],
}

DB_HINTS = {
    "mysql": ["mysqld", "innodb", "mysql", "error 1045", "error 2002"],
    "postgres": ["postgres", "FATAL:", "PANIC:", "could not connect", "connection refused"],
    "mongodb": ["mongod", "mongodb", "wiredtiger", "E NETWORK"],
}

APP_HINTS = {
    "java": ["exception", "at ", "Caused by:", "NullPointerException"],
    "python": ["Traceback (most recent call last)", "Exception:", "ERROR"],
    "nodejs": ["UnhandledPromiseRejectionWarning", "TypeError:", "ReferenceError:"],
}

TS_PATTERNS = [
    re.compile(r"(?P&lt;ts&gt;\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2})"),
    re.compile(r"(?P&lt;ts&gt;\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})"),  # syslog: Feb  2 12:30:10
]

def guess_category(line: str) -&gt; str:
    low = line.lower()
    for name, hints in DB_HINTS.items():
        if any(h in low for h in hints):
            return "数据库"
    for name, hints in MIDDLEWARE_HINTS.items():
        if any(h in low for h in hints):
            return "中间件"
    for name, hints in APP_HINTS.items():
        if any(h.lower() in low for h in hints):
            return "应用"
    if any(k in low for k in SECURITY_KEYWORDS):
        return "安全"
    return "系统"

def extract_ts(line: str) -&gt; str:
    for pat in TS_PATTERNS:
        m = pat.search(line)
        if m:
            return m.group("ts")
    return ""

def normalize_ts(ts: str) -&gt; str:
    """统一转换为 YYYY-MM-DD HH:MM:SS，无法解析则返回原字符串"""
    ts = (ts or "").strip()
    if not ts:
        return ""
    # 1) already iso
    for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"]:
        try:
            dt = datetime.strptime(ts[:19], fmt)
            return dt.strftime("%Y-%m-%d %H:%M:%S")
        except Exception:
            pass
    # 2) syslog style: Feb  2 12:30:10
    try:
        dt = datetime.strptime(ts, "%b %d %H:%M:%S")
        dt = dt.replace(year=datetime.now().year)
        return dt.strftime("%Y-%m-%d %H:%M:%S")
    except Exception:
        pass
    # 3) nginx/apache: 02/Feb/2026:03:50:38 +0000
    try:
        dt = datetime.strptime(ts.split()[0], "%d/%b/%Y:%H:%M:%S")
        return dt.strftime("%Y-%m-%d %H:%M:%S")
    except Exception:
        pass
    return ts

# =============================
# IP 提取（用于日志分析 Attack IP 列）
# =============================
IPV4_RE = re.compile(r"\b(?:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\.){3}(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)\b")
IPV6_RE = re.compile(r"\b(?:[0-9a-fA-F]{0,4}:){2,7}[0-9a-fA-F]{0,4}\b")

def extract_ip_from_line(line: str) -&gt; str:
    """从日志行中尽量抽取一个 IP（IPv4/IPv6），并用 ipaddress 做二次校验避免把时间等误判成 IPv6。"""
    if not line:
        return ""

    # 先找 IPv4（更明确）
    for m in IPV4_RE.finditer(line):
        cand = m.group(0)
        try:
            ipaddress.ip_address(cand)
            return cand
        except Exception:
            continue

    # 再找 IPv6：regex 只负责“可能长得像”，最终以 ipaddress 校验为准
    for m in IPV6_RE.finditer(line):
        cand = m.group(0)
        # 过滤明显的时间戳形态（例如 15:36:49）
        if re.fullmatch(r"\d{1,2}:\d{1,2}:\d{1,2}", cand):
            continue
        try:
            ipaddress.ip_address(cand)
            return cand
        except Exception:
            continue

    return ""

def detect_severity(line: str) -&gt; Tuple[str, str]:
    low = line.lower()
    for sev, kws in SEVERITY_KEYWORDS.items():
        for kw in kws:
            if kw in low:
                return sev, kw
    # security keywords as WARN/ERROR
    for kw in SECURITY_KEYWORDS:
        if kw in low:
            return "WARN", kw
    return "INFO", ""

def iter_log_lines(path: str, max_lines: int = 200000):
    # 支持 .gz
    if path.lower().endswith(".gz"):
        with gzip.open(path, "rt", encoding="utf-8", errors="replace") as f:
            for i, line in enumerate(f):
                if i &gt;= max_lines:
                    break
                yield line.rstrip("\n")
    else:
        with open(path, "r", encoding="utf-8", errors="replace") as f:
            for i, line in enumerate(f):
                if i &gt;= max_lines:
                    break
                yield line.rstrip("\n")

def analyze_log_file(path: str, max_findings: int = 5000) -&gt; Tuple[List[LogFinding], Dict[str, Any]]:
    findings: List[LogFinding] = []
    counts = Counter()
    sev_counts = Counter()
    cat_counts = Counter()
    atk_counts = Counter()

    # Access Log 统计（SIEM视角）
    access_ip_counts = Counter()
    access_uri_counts = Counter()
    access_ua_counts = Counter()
    access_status_counts = Counter()
    risk_by_ip = Counter()

    for line in iter_log_lines(path):
        access = parse_access_log_line(line)
        if access:
            # 统计（所有 access 行都统计）
            ip_ = access.get('ip','')
            uri_ = access.get('uri','')
            ua_ = access.get('ua','')
            st_ = access.get('status','')
            access_ip_counts[ip_] += 1
            access_uri_counts[uri_] += 1
            if ua_:
                access_ua_counts[ua_] += 1
            if st_:
                access_status_counts[st_] += 1
            risk_by_ip[ip_] += risk_score_access(ip_, uri_, st_, ua_)

            # 1) 先做 SQLi/XSS 参数级分析
            atk, ev = analyze_uri_for_attack(uri_)
            if atk:
                # 强制告警
                sev = 'WARN'
                kw = ev or atk
                ts = access.get('time','')
                cat = '安全'
                findings.append(LogFinding(
                    source=path,
                    category=cat,
                    severity=sev,
                    timestamp=normalize_ts(ts),
                    keyword=kw,
                    message=f"{access.get('ip')} {access.get('method')} {access.get('uri')} status={access.get('status')} referer={access.get('referer')} ua={access.get('ua')}",
                    attack_ip=access.get('ip',''),
                    attack_type=atk
                ))
                counts[kw or sev] += 1
                sev_counts[sev] += 1
                cat_counts[cat] += 1
                atk_counts[atk] += 1
                if len(findings) &gt;= max_findings:
                    break
                continue

            # 2) 再做爬虫识别（不一定是攻击，但可用于蓝队/运营画像）
            ctype, cev = detect_crawler(access.get('ua','') or '')
            if ctype:
                sev2 = 'INFO' if ctype == '搜索引擎爬虫' else 'WARN'
                ts = access.get('time','')
                cat = '爬虫'
                findings.append(LogFinding(
                    source=path,
                    category=cat,
                    severity=sev2,
                    timestamp=normalize_ts(ts),
                    keyword=cev,
                    message=f"{access.get('ip')} {access.get('method')} {access.get('uri')} status={access.get('status')} referer={access.get('referer')} ua={access.get('ua')}",
                    attack_ip=access.get('ip',''),
                    attack_type='爬虫'
                ))
                counts[cev or sev2] += 1
                sev_counts[sev2] += 1
                cat_counts[cat] += 1
                atk_counts['爬虫'] += 1
                if len(findings) &gt;= max_findings:
                    break

        sev, kw = detect_severity(line)
        if sev == "INFO":
            continue
        ts = extract_ts(line)
        cat = guess_category(line)
        attack_type = detect_attack_type(line)
        if attack_type:
            cat = "安全"
            if sev == "INFO":
                sev = "WARN"

        findings.append(LogFinding(
            source=path,
            category=cat,
            severity=sev,
            timestamp=normalize_ts(ts),
            keyword=kw,
            message=line[:500],
            attack_ip=extract_ip_from_line(line),
            attack_type=attack_type
        ))
        counts[kw or sev] += 1
        sev_counts[sev] += 1
        cat_counts[cat] += 1
        if attack_type:
            atk_counts[attack_type] += 1

        if len(findings) &gt;= max_findings:
            break

    stats = {
        "file": path,
        "total_findings": len(findings),
        "severity": dict(sev_counts),
        "category": dict(cat_counts),
        "top_keywords": counts.most_common(15),
        "top_attack_types": atk_counts.most_common(10),
        "access_top_ips": access_ip_counts.most_common(15),
        "access_top_uris": access_uri_counts.most_common(15),
        "access_top_uas": access_ua_counts.most_common(10),
        "access_status": access_status_counts.most_common(10),
        "risk_top_ips": risk_by_ip.most_common(15),
    }
    return findings, stats

def infer_service_from_banner(banner: str) -&gt; str:
    if not banner:
        return ""
    lower = banner.lower()

    if "ssh-" in lower:
        return "ssh"
    if "http/" in lower or "server:" in lower or "set-cookie:" in lower:
        return "http"
    if "smtp" in lower or "esmtp" in lower:
        return "smtp"
    if "imap" in lower:
        return "imap"
    if "pop3" in lower:
        return "pop3"
    if "redis" in lower or lower.startswith("+pong"):
        return "redis"
    if "memcached" in lower:
        return "memcached"
    if "mysql" in lower:
        return "mysql"
    if "postgres" in lower:
        return "postgres"
    if "mongodb" in lower:
        return "mongodb"
    if "tls ok" in lower or "issuer=" in lower:
        return "tls"
    if "ftp" in lower:
        return "ftp"
    return ""
    lower = banner.lower()
    if "ssh-" in lower:
        return "ssh"
    if "http/" in lower or "server:" in lower:
        return "http"
    if "smtp" in lower or "esmtp" in lower:
        return "smtp"
    if "redis" in lower:
        return "redis"
    if "mysql" in lower:
        return "mysql"
    if "postgres" in lower:
        return "postgres"
    if "mongodb" in lower:
        return "mongodb"
    if "ftp" in lower:
        return "ftp"
    return ""

def tcp_connect(host: str, port: int, timeout: float) -&gt; bool:
    family = socket.AF_INET6 if ":" in host else socket.AF_INET
    try:
        with socket.socket(family, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            return s.connect_ex((host, port)) == 0
    except Exception:
        return False

def banner_grab_tcp(host: str, port: int, timeout: float, sni_name: str = "") -&gt; Tuple[str, str]:
    """更强 probes + TLS 探测"""
    family = socket.AF_INET6 if ":" in host else socket.AF_INET
    service = SERVICE_GUESS.get(port, "")
    banner = ""

    # 1) TLS 探测
    tls_ok = False
    tls_info = ""
    if port in TLS_PORTS:
        tls_ok, tls_info = tls_handshake_and_cert(host, port, timeout, server_name=sni_name)
        if tls_ok:
            if not service:
                service = "tls"
            if port in (443, 8443, 9443):
                service = "https"
            elif port == 993:
                service = "imaps"
            elif port == 995:
                service = "pop3s"
            elif port == 465:
                service = "smtps"
            elif port == 587:
                service = "smtp-submission"
            banner = tls_info

    # 2) HTTP 探测（header/title）
    if port in HTTP_PORTS:
        http_banner = http_probe(host, port, timeout, use_tls=tls_ok, host_header=(sni_name or "localhost"))
        if http_banner:
            banner = http_banner
            service = "https" if (tls_ok or port in (443, 8443, 9443)) else "http"

    # 3) 特定协议探测
    if port == 6379:
        b = redis_probe(host, port, timeout)
        if b:
            banner = b
            service = "redis"

    if port == 11211:
        b = memcached_probe(host, port, timeout)
        if b:
            banner = b
            service = "memcached"

    if port == 3306:
        b = mysql_probe(host, port, timeout)
        if b:
            banner = b
            service = "mysql"

    if port == 5432:
        b = pg_probe(host, port, timeout)
        if b:
            banner = sanitize_banner(b)
            service = "postgres"

    # 4) 被动 banner + 通用 probes 兜底
    if not banner:
        try:
            with socket.socket(family, socket.SOCK_STREAM) as s:
                s.settimeout(timeout)
                s.connect((host, port))

                try:
                    data = s.recv(512)
                    if data:
                        banner = safe_decode(data)
                except Exception:
                    pass

                if not banner:
                    probes = TCP_PROBES
                    if port in (80, 8080, 8000, 8443, 443):
                        probes = [TCP_PROBES[0], TCP_PROBES[1]] + TCP_PROBES[2:]

                    for _, payload in probes:
                        if payload:
                            try:
                                s.sendall(payload)
                            except Exception:
                                continue
                        try:
                            data2 = s.recv(512)
                            if data2:
                                banner = safe_decode(data2)
                                break
                        except Exception:
                            continue

        except Exception:
            pass

    banner = sanitize_banner(banner)

    if not service:
        service = infer_service_from_banner(banner)

    if tls_ok and not banner and tls_info:
        banner = tls_info

    return service, banner

def scan_tcp(host: str, ip: str, port: int, timeout: float, do_banner: bool) -&gt; Optional[ScanResult]:
    if not tcp_connect(ip, port, timeout):
        return None

    service = SERVICE_GUESS.get(port, "")
    banner = ""
    if do_banner:
        service2, banner2 = banner_grab_tcp(ip, port, timeout, sni_name=(host if is_domain(host) else ""))
        if service2:
            service = service2
        banner = banner2

    return ScanResult(host=host, ip=ip, proto="tcp", port=port, state="open", service=service, banner=banner)

# =============================
# 可达性探测
# =============================
def ping_host(host: str, timeout_ms: int = 800) -&gt; bool:
    system = platform.system().lower()
    try:
        if system == "windows":
            cmd = ["ping", "-n", "1", "-w", str(timeout_ms), host]
        else:
            cmd = ["ping", "-c", "1", "-W", "1", host]
        res = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=max(1, timeout_ms // 1000 + 1))
        return res.returncode == 0
    except Exception:
        return False

def tcp_reachable_probe(ip: str, timeout: float = 0.6, ports: Tuple[int, ...] = (443, 80, 22)) -&gt; bool:
    for p in ports:
        try:
            family = socket.AF_INET6 if ":" in ip else socket.AF_INET
            with socket.socket(family, socket.SOCK_STREAM) as s:
                s.settimeout(timeout)
                r = s.connect_ex((ip, p))
                if r == 0 or r in (111, 61, 10061):
                    return True
        except Exception:
            continue
    return False

# =============================
# 扫描线程
# =============================
class ScannerThread(QThread):
    log = pyqtSignal(str)
    progress = pyqtSignal(int, int)  # done, total
    result = pyqtSignal(object)      # ScanResult
    finished_scan = pyqtSignal(dict) # stats

    def __init__(
        self,
        targets: List[str],
        tcp_ports: List[int],
        threads: int,
        timeout: float,
        do_banner: bool,
        skip_unreachable: bool,
        reach_method: str,  # "tcp" or "ping"
        parent=None
    ):
        super().__init__(parent)
        self.targets = targets
        self.tcp_ports = tcp_ports
        self.threads = max(1, threads)
        self.timeout = max(0.05, timeout)
        self.do_banner = do_banner
        self.skip_unreachable = skip_unreachable
        self.reach_method = reach_method
        self._stop = threading.Event()
        self._results: List[ScanResult] = []

    def stop(self):
        self._stop.set()

    @property
    def results(self) -&gt; List[ScanResult]:
        return list(self._results)

    def run(self):
        from concurrent.futures import ThreadPoolExecutor, as_completed

        # 解析目标 -&gt; (host, ip)
        resolved = []
        for host in self.targets:
            if self._stop.is_set():
                self.log.emit("已停止。")
                self.finished_scan.emit({})
                return

            h, ip = resolve_host_to_ip(host)
            if not ip:
                self.log.emit(f"[解析失败] {host}")
                continue
            resolved.append((h, ip))

        # 可达性探测
        live = []
        for h, ip in resolved:
            if self._stop.is_set():
                self.log.emit("已停止。")
                self.finished_scan.emit({})
                return

            if not self.skip_unreachable:
                live.append((h, ip))
                continue

            ok = ping_host(ip) if self.reach_method == "ping" else tcp_reachable_probe(ip, timeout=min(1.0, self.timeout))
            if ok:
                live.append((h, ip))
                self.log.emit(f"[主机在线] {h} ({ip})")
            else:
                self.log.emit(f"[跳过] {h} ({ip}) 不可达")

        # 构建任务
        tasks = []
        for h, ip in live:
            for p in self.tcp_ports:
                tasks.append(("tcp", h, ip, p))

        total = len(tasks)
        done = 0
        self.progress.emit(done, total)
        self.log.emit(f"开始扫描：目标 {len(live)} 台，任务 {total} 个")

        def do_task(proto: str, h: str, ip: str, port: int) -&gt; Optional[ScanResult]:
            if self._stop.is_set():
                return None
            if proto == "tcp":
                return scan_tcp(h, ip, port, self.timeout, self.do_banner)

        try:
            with ThreadPoolExecutor(max_workers=self.threads) as ex:
                future_map = {ex.submit(do_task, proto, h, ip, port): (proto, h, ip, port) for proto, h, ip, port in tasks}
                for fut in as_completed(future_map):
                    if self._stop.is_set():
                        self.log.emit("用户请求停止。")
                        break
                    res = None
                    try:
                        res = fut.result()
                    except Exception:
                        res = None

                    done += 1
                    self.progress.emit(done, total)

                    if res:
                        self._results.append(res)
                        self.result.emit(res)

        except Exception as e:
            self.log.emit(f"[异常] {e}")

        stats = self.build_stats(self._results)
        self.finished_scan.emit(stats)

    def build_stats(self, results: List[ScanResult]) -&gt; dict:
        per_host_open = defaultdict(int)
        svc_counter = Counter()
        proto_counter = Counter()

        for r in results:
            if r.state.startswith("open"):
                per_host_open[r.ip] += 1
                if r.service:
                    svc_counter[r.service] += 1
                proto_counter[r.proto] += 1

        top_services = svc_counter.most_common(10)
        return {
            "generated_at": datetime.now().isoformat(),
            "total_results": len(results),
            "per_host_open_ports": dict(per_host_open),
            "top_services": top_services,
            "proto_count": dict(proto_counter),
        }

# =============================
# UI
# =============================
class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("端口扫描器（PyQt6）- TCP + CIDR + Banner + 双栈 + 报告导出")
        self.resize(1280, 760)

        self.thread: Optional[ScannerThread] = None
        self.all_results: List[ScanResult] = []
        self.last_stats: Dict[str, Any] = {}

        root = QWidget()
        self.setCentralWidget(root)
        layout = QVBoxLayout(root)

        self.tabs = QTabWidget()
        layout.addWidget(self.tabs)

        scan_tab = QWidget()
        scan_layout = QVBoxLayout(scan_tab)
        self.tabs.addTab(scan_tab, "端口扫描")

        log_tab = QWidget()
        log_layout = QVBoxLayout(log_tab)
        self.tabs.addTab(log_tab, "日志分析")

        # 目标
        g_target = QGroupBox("目标（IP/域名/CIDR，支持多行或逗号分隔，支持IPv6）")
        gl = QVBoxLayout(g_target)
        self.targets_edit = QTextEdit()
        self.targets_edit.setPlaceholderText("例如：\n192.168.1.10\n192.168.1.0/24\n2001:db8::1\n2001:db8::/120\nexample.com")
        self.targets_edit.setFixedHeight(120)
        gl.addWidget(self.targets_edit)
        scan_layout.addWidget(g_target)

        # 参数
        g_opt = QGroupBox("扫描参数")
        opt = QHBoxLayout(g_opt)

        opt.addWidget(QLabel("TCP端口："))
        self.tcp_ports_edit = QLineEdit("1-65535")  # 默认全端口
        self.tcp_ports_edit.setToolTip("默认全端口：1-65535；也支持 22,80,443 或 1-1024")
        opt.addWidget(self.tcp_ports_edit, 2)

        opt.addWidget(QLabel("线程数："))
        self.threads_spin = QSpinBox()
        self.threads_spin.setRange(1, 3000)
        self.threads_spin.setValue(600)
        opt.addWidget(self.threads_spin)

        opt.addWidget(QLabel("超时(s)："))
        self.timeout_spin = QDoubleSpinBox()
        self.timeout_spin.setRange(0.05, 10.0)
        self.timeout_spin.setSingleStep(0.1)
        self.timeout_spin.setValue(0.6)
        opt.addWidget(self.timeout_spin)

        self.banner_check = QCheckBox("服务识别（Banner/Probe）")
        self.banner_check.setChecked(True)
        opt.addWidget(self.banner_check)

        self.skip_check = QCheckBox("跳过不可达主机")
        self.skip_check.setChecked(True)
        opt.addWidget(self.skip_check)

        opt.addWidget(QLabel("可达性探测："))
        self.reach_combo = QComboBox()
        self.reach_combo.addItems(["TCP探测（推荐）", "Ping(ICMP)"])
        opt.addWidget(self.reach_combo)

        scan_layout.addWidget(g_opt)

        # 按钮
        btn_row = QHBoxLayout()
        self.start_btn = QPushButton("开始扫描")
        self.stop_btn = QPushButton("停止")
        self.stop_btn.setEnabled(False)

        self.export_json_btn = QPushButton("导出JSON（含统计）")
        self.export_csv_btn = QPushButton("导出CSV（含统计）")
        self.export_json_btn.setEnabled(False)
        self.export_csv_btn.setEnabled(False)

        btn_row.addWidget(self.start_btn)
        btn_row.addWidget(self.stop_btn)
        btn_row.addStretch(1)
        btn_row.addWidget(self.export_json_btn)
        btn_row.addWidget(self.export_csv_btn)
        scan_layout.addLayout(btn_row)

        # 进度条
        prog_row = QHBoxLayout()
        self.status_label = QLabel("状态：空闲")
        self.progress_label = QLabel("0/0")
        self.progress_bar = QProgressBar()
        self.progress_bar.setMinimum(0)
        self.progress_bar.setMaximum(100)
        self.progress_bar.setValue(0)
        prog_row.addWidget(self.status_label, 2)
        prog_row.addWidget(self.progress_bar, 4)
        prog_row.addWidget(self.progress_label, 1)
        scan_layout.addLayout(prog_row)

        # 表格
        self.table = QTableWidget(0, 7)
        self.table.setHorizontalHeaderLabels(["目标", "IP", "协议", "端口", "状态", "服务", "Banner/响应"])
        self.table.horizontalHeader().setStretchLastSection(True)
        #self.table.setSortingEnabled(True)
        scan_layout.addWidget(self.table, 5)

        # 日志
        self.log_box = QTextEdit()
        self.log_box.setReadOnly(True)
        self.log_box.setFixedHeight(160)
        scan_layout.addWidget(self.log_box)

        # =============================
        # 日志分析 Tab
        # =============================
        g_log = QGroupBox("导入日志文件/目录（支持 .log/.txt/.out/.err/.gz 等）")
        lg = QVBoxLayout(g_log)

        row1 = QHBoxLayout()
        self.log_path_edit = QLineEdit()
        self.log_path_edit.setPlaceholderText("选择日志文件或目录...")
        self.btn_pick_file = QPushButton("选择文件")
        self.btn_pick_dir = QPushButton("选择目录")
        row1.addWidget(self.log_path_edit, 4)
        row1.addWidget(self.btn_pick_file, 1)
        row1.addWidget(self.btn_pick_dir, 1)
        lg.addLayout(row1)

        row2 = QHBoxLayout()
        row2.addWidget(QLabel("最大读取行数/文件："))
        self.max_lines_spin = QSpinBox()
        self.max_lines_spin.setRange(1000, 2000000)
        self.max_lines_spin.setValue(200000)
        row2.addWidget(self.max_lines_spin)

        row2.addWidget(QLabel("最大告警条数/文件："))
        self.max_findings_spin = QSpinBox()
        self.max_findings_spin.setRange(100, 20000)
        self.max_findings_spin.setValue(5000)
        row2.addWidget(self.max_findings_spin)

        self.btn_analyze_logs = QPushButton("开始分析")
        row2.addWidget(self.btn_analyze_logs)
        row2.addStretch(1)
        lg.addLayout(row2)

        log_layout.addWidget(g_log)

        # 统计输出
        self.log_stats_box = QTextEdit()
        self.log_stats_box.setReadOnly(True)
        self.log_stats_box.setFixedHeight(160)
        log_layout.addWidget(self.log_stats_box)

        # 日志过滤
        g_filter = QGroupBox("日志过滤")
        fl = QHBoxLayout(g_filter)

        fl.addWidget(QLabel("严重性："))
        self.filter_sev = QComboBox()
        self.filter_sev.addItems(["全部", "INFO", "WARN", "ERROR", "CRITICAL"])
        fl.addWidget(self.filter_sev)

        fl.addWidget(QLabel("类别："))
        self.filter_cat = QComboBox()
        self.filter_cat.addItems(["全部", "系统", "中间件", "数据库", "应用", "安全", "爬虫"])
        fl.addWidget(self.filter_cat)

        fl.addWidget(QLabel("攻击类型："))
        self.filter_atk = QComboBox()
        self.filter_atk.addItems(["全部", "SQL注入", "XSS", "路径穿越", "命令注入", "SSRF", "扫描/探测", "暴力破解", "WebShell/恶意文件", "爬虫"])
        fl.addWidget(self.filter_atk)

        fl.addWidget(QLabel("关键字："))
        self.filter_kw = QLineEdit()
        self.filter_kw.setPlaceholderText("包含关键字(支持 IP/URI/UA/Referer)...")
        fl.addWidget(self.filter_kw, 3)

        self.btn_apply_filter = QPushButton("应用过滤")
        self.btn_reset_filter = QPushButton("重置")
        fl.addWidget(self.btn_apply_filter)
        fl.addWidget(self.btn_reset_filter)

        log_layout.addWidget(g_filter)

        # Findings 列表
        self.findings_table = QTableWidget(0, 8)
        self.findings_table.setHorizontalHeaderLabels(["严重性", "类别", "攻击类型", "攻击IP", "时间", "关键字", "来源", "内容"])
        self.findings_table.horizontalHeader().setStretchLastSection(True)
        log_layout.addWidget(self.findings_table, 5)

        # 导出按钮
        row3 = QHBoxLayout()
        self.btn_export_findings_json = QPushButton("导出发现(JSON)")
        self.btn_export_findings_csv = QPushButton("导出发现(CSV)")
        self.btn_export_findings_json.setEnabled(False)
        self.btn_export_findings_csv.setEnabled(False)
        row3.addStretch(1)
        row3.addWidget(self.btn_export_findings_json)
        row3.addWidget(self.btn_export_findings_csv)
        log_layout.addLayout(row3)

        self.log_findings: List[LogFinding] = []
        self.log_stats: List[dict] = []

        # signals
        self.start_btn.clicked.connect(self.on_start)
        self.stop_btn.clicked.connect(self.on_stop)
        self.export_json_btn.clicked.connect(self.on_export_json)
        self.export_csv_btn.clicked.connect(self.on_export_csv)

        # 日志分析 signals
        self.btn_pick_file.clicked.connect(self.pick_log_file)
        self.btn_pick_dir.clicked.connect(self.pick_log_dir)
        self.btn_analyze_logs.clicked.connect(self.analyze_logs)
        self.btn_export_findings_json.clicked.connect(self.export_findings_json)
        self.btn_export_findings_csv.clicked.connect(self.export_findings_csv)
        self.btn_apply_filter.clicked.connect(self.apply_log_filter)
        self.btn_reset_filter.clicked.connect(self.reset_log_filter)

    def append_log(self, msg: str):
        ts = datetime.now().strftime("%H:%M:%S")
        self.log_box.append(f"[{ts}] {msg}")

    def on_start(self):
        if self.thread and self.thread.isRunning():
            QMessageBox.warning(self, "提示", "扫描正在进行中。")
            return

        targets_raw = self.targets_edit.toPlainText().strip()
        if not targets_raw:
            QMessageBox.warning(self, "提示", "请输入至少一个目标。")
            return

        try:
            targets = expand_targets(targets_raw)
        except Exception as e:
            QMessageBox.critical(self, "错误", f"目标解析失败：{e}")
            return

        try:
            tcp_ports = parse_ports(self.tcp_ports_edit.text())
        except Exception as e:
            QMessageBox.critical(self, "错误", f"端口格式错误：{e}")
            return

        if not tcp_ports:
            QMessageBox.warning(self, "提示", "请填写 TCP")
            return

        # reset
        self.table.setRowCount(0)
        self.all_results = []
        self.last_stats = {}
        self.export_json_btn.setEnabled(False)
        self.export_csv_btn.setEnabled(False)

        threads = int(self.threads_spin.value())
        timeout = float(self.timeout_spin.value())
        do_banner = bool(self.banner_check.isChecked())
        skip_unreachable = bool(self.skip_check.isChecked())
        reach_method = "tcp" if self.reach_combo.currentIndex() == 0 else "ping"

        self.status_label.setText("状态：扫描中...")
        self.progress_label.setText("0/0")
        self.progress_bar.setValue(0)
        self.start_btn.setEnabled(False)
        self.stop_btn.setEnabled(True)

        self.append_log(f"开始：目标={len(targets)} TCP端口={len(tcp_ports)}")

        self.thread = ScannerThread(
            targets=targets,
            tcp_ports=tcp_ports,
            threads=threads,
            timeout=timeout,
            do_banner=do_banner,
            skip_unreachable=skip_unreachable,
            reach_method=reach_method
        )
        self.thread.log.connect(self.append_log)
        self.thread.progress.connect(self.on_progress)
        self.thread.result.connect(self.on_result)
        self.thread.finished_scan.connect(self.on_finished)
        self.thread.start()

    def on_stop(self):
        if self.thread and self.thread.isRunning():
            self.thread.stop()
            self.append_log("用户请求停止...")

    def on_progress(self, done: int, total: int):
        self.progress_label.setText(f"{done}/{total}")
        if total &lt;= 0:
            self.progress_bar.setValue(0)
        else:
            self.progress_bar.setValue(int(done * 100 / total))

    def on_result(self, res: ScanResult):
        self.all_results.append(res)
        row = self.table.rowCount()
        self.table.insertRow(row)

        self.table.setItem(row, 0, QTableWidgetItem(res.host))
        self.table.setItem(row, 1, QTableWidgetItem(res.ip))
        self.table.setItem(row, 2, QTableWidgetItem(res.proto))
        self.table.setItem(row, 3, QTableWidgetItem(str(res.port)))
        self.table.setItem(row, 4, QTableWidgetItem(res.state))
        self.table.setItem(row, 5, QTableWidgetItem(res.service))
        self.table.setItem(row, 6, QTableWidgetItem(res.banner))

    def on_finished(self, stats: dict):
        self.last_stats = stats or {}
        self.status_label.setText("状态：完成")
        self.start_btn.setEnabled(True)
        self.stop_btn.setEnabled(False)

        if self.all_results:
            self.export_json_btn.setEnabled(True)
            self.export_csv_btn.setEnabled(True)
            self.append_log(f"完成：发现结果 {len(self.all_results)} 条")
        else:
            self.append_log("完成：未发现开放端口（或全部被过滤）")

        # if self.last_stats:
        #     self.append_log("=== 统计 ===")
        #     self.append_log(f"总结果数：{self.last_stats.get('total_results', 0)}")
        #     self.append_log(f"协议统计：{self.last_stats.get('proto_count', {})}")
        #     self.append_log(f"Top服务：{self.last_stats.get('top_services', [])}")

    def on_export_json(self):
        if not self.all_results:
            QMessageBox.information(self, "提示", "没有可导出的结果。")
            return

        path, _ = QFileDialog.getSaveFileName(self, "保存JSON", "scan_results.json", "JSON 文件 (*.json)")
        if not path:
            return

        payload = {
            "generated_at": datetime.now().isoformat(),
            "stats": self.last_stats,
            "results": [asdict(r) for r in self.all_results],
        }
        try:
            with open(path, "w", encoding="utf-8") as f:
                json.dump(payload, f, ensure_ascii=False, indent=2)
            self.append_log(f"已导出JSON：{path}")
        except Exception as e:
            QMessageBox.critical(self, "错误", f"导出失败：{e}")

    def on_export_csv(self):
        if not self.all_results:
            QMessageBox.information(self, "提示", "没有可导出的结果。")
            return

        path, _ = QFileDialog.getSaveFileName(self, "保存CSV", "scan_results.csv", "CSV 文件 (*.csv)")
        if not path:
            return

        try:
            with open(path, "w", newline="", encoding="utf-8") as f:
                w = csv.writer(f)
                # # 写统计头
                # w.writerow(["# generated_at", datetime.now().isoformat()])
                # w.writerow(["# total_results", self.last_stats.get("total_results", 0)])
                # w.writerow(["# proto_count", json.dumps(self.last_stats.get("proto_count", {}), ensure_ascii=False)])
                # w.writerow(["# top_services", json.dumps(self.last_stats.get("top_services", []), ensure_ascii=False)])
                # w.writerow([])

                # 写明细
                w.writerow(["host", "ip", "proto", "port", "state", "service", "banner"])
                for r in self.all_results:
                    w.writerow([r.host, r.ip, r.proto, r.port, r.state, r.service, r.banner])

            self.append_log(f"已导出CSV：{path}")
        except Exception as e:
            QMessageBox.critical(self, "错误", f"导出失败：{e}")

    # =============================
    # 日志分析功能
    # =============================
    def pick_log_file(self):
        path, _ = QFileDialog.getOpenFileName(self, "选择日志文件", "", "日志文件 (*.log *.txt *.out *.err *.gz);;所有文件 (*.*)")
        if path:
            self.log_path_edit.setText(path)

    def pick_log_dir(self):
        path = QFileDialog.getExistingDirectory(self, "选择日志目录")
        if path:
            self.log_path_edit.setText(path)

    def iter_log_paths(self, root_path: str) -&gt; List[str]:
        p = pathlib.Path(root_path)
        if p.is_file():
            return [str(p)]
        if p.is_dir():
            exts = {".log", ".txt", ".out", ".err", ".gz"}
            files = []
            for fp in p.rglob("*"):
                if fp.is_file() and (fp.suffix.lower() in exts or fp.name.lower().endswith(".log.gz")):
                    files.append(str(fp))
            return files[:500]  # 防止目录过大
        return []

    def analyze_logs(self):
        root_path = self.log_path_edit.text().strip()
        if not root_path:
            QMessageBox.warning(self, "提示", "请先选择日志文件或目录。")
            return

        paths = self.iter_log_paths(root_path)
        if not paths:
            QMessageBox.warning(self, "提示", "未找到可分析的日志文件。")
            return

        self.log_findings = []
        self.log_stats = []
        self.findings_table.setRowCount(0)
        self.log_stats_box.clear()

        max_lines = int(self.max_lines_spin.value())
        max_findings = int(self.max_findings_spin.value())

        self.log_stats_box.append(f"开始分析：文件数={len(paths)}")
        QApplication.processEvents()

        for path in paths:
            try:
                # 覆盖 max_lines / max_findings
                global iter_log_lines
                # 临时包装 iter_log_lines 以传入 max_lines
                def _iter(path_in):
                    return iter_log_lines(path_in, max_lines=max_lines)

                findings, stats = analyze_log_file(path, max_findings=max_findings)
                self.log_stats.append(stats)
                self.log_findings.extend(findings)
            except Exception as e:
                self.log_stats_box.append(f"[失败] {path}: {e}")

        # 输出统计汇总
        total_files = len(self.log_stats)
        total_findings = len(self.log_findings)
        sev_sum = Counter()
        cat_sum = Counter()
        kw_sum = Counter()

        for st in self.log_stats:
            for k, v in st.get("severity", {}).items():
                sev_sum[k] += v
            for k, v in st.get("category", {}).items():
                cat_sum[k] += v
            for k, v in st.get("top_keywords", []):
                if k:
                    kw_sum[k] += v

        self.log_stats_box.append("")
        self.log_stats_box.append("=== 汇总统计 ===")
        self.log_stats_box.append(f"文件数：{total_files}")
        self.log_stats_box.append(f"告警条数：{total_findings}")
        self.log_stats_box.append(f"严重性统计：{dict(sev_sum)}")
        self.log_stats_box.append(f"类别统计：{dict(cat_sum)}")
        atk_sum = Counter()
        for st in self.log_stats:
            for k, v in st.get("top_attack_types", []):
                atk_sum[k] += v

        # self.log_stats_box.append(f"Top关键字：{kw_sum.most_common(10)}")
        # access_ip_sum = Counter()
        # access_uri_sum = Counter()
        # access_status_sum = Counter()
        # risk_ip_sum = Counter()
        # for st in self.log_stats:
        #     for k, v in st.get('access_top_ips', []):
        #         access_ip_sum[k] += v
        #     for k, v in st.get('access_top_uris', []):
        #         access_uri_sum[k] += v
        #     for k, v in st.get('access_status', []):
        #         access_status_sum[k] += v
        #     for k, v in st.get('risk_top_ips', []):
        #         risk_ip_sum[k] += v

        # self.log_stats_box.append(f"Top攻击类型：{atk_sum.most_common(10)}")
        # self.log_stats_box.append(f"Top访问IP：{access_ip_sum.most_common(10)}")
        # self.log_stats_box.append(f"Top访问URI：{access_uri_sum.most_common(10)}")
        # self.log_stats_box.append(f"状态码分布：{access_status_sum.most_common(10)}")
        # self.log_stats_box.append(f"Top风险IP：{risk_ip_sum.most_common(10)}")

        # 填充表格
        self.refresh_findings_table(self.log_findings)
        self.btn_export_findings_json.setEnabled(bool(self.log_findings))
        self.btn_export_findings_csv.setEnabled(bool(self.log_findings))

    def export_findings_json(self):
        if not self.log_findings:
            return
        path, _ = QFileDialog.getSaveFileName(self, "保存JSON", "log_findings.json", "JSON 文件 (*.json)")
        if not path:
            return
        payload = {
            "generated_at": datetime.now().isoformat(),
            "summary": self.log_stats,
            "findings": [asdict(x) for x in self.log_findings],
        }
        with open(path, "w", encoding="utf-8") as f:
            json.dump(payload, f, ensure_ascii=False, indent=2)
        QMessageBox.information(self, "完成", "已导出 JSON。")

    def export_findings_csv(self):
        if not self.log_findings:
            return
        path, _ = QFileDialog.getSaveFileName(self, "保存CSV", "log_findings.csv", "CSV 文件 (*.csv)")
        if not path:
            return
        with open(path, "w", newline="", encoding="utf-8") as f:
            w = csv.writer(f)
            w.writerow(["severity", "category", "attack_type", "attack_ip", "timestamp", "keyword", "source", "message"])
            for x in self.log_findings:
                w.writerow([x.severity, x.category, x.attack_type, getattr(x, "attack_ip", ""), x.timestamp, x.keyword, x.source, x.message])
        QMessageBox.information(self, "完成", "已导出 CSV。")

    def refresh_findings_table(self, findings: List[LogFinding]):
        self.findings_table.setRowCount(0)
        for fnd in findings[:20000]:
            row = self.findings_table.rowCount()
            self.findings_table.insertRow(row)

            # 8 columns:
            # 0 严重性, 1 类别, 2 攻击类型, 3 攻击IP, 4 时间, 5 关键字, 6 来源, 7 内容
            self.findings_table.setItem(row, 0, QTableWidgetItem(fnd.severity))
            self.findings_table.setItem(row, 1, QTableWidgetItem(fnd.category))
            self.findings_table.setItem(row, 2, QTableWidgetItem(fnd.attack_type))
            self.findings_table.setItem(row, 3, QTableWidgetItem(getattr(fnd, "attack_ip", "") or ""))
            self.findings_table.setItem(row, 4, QTableWidgetItem(fnd.timestamp))
            self.findings_table.setItem(row, 5, QTableWidgetItem(fnd.keyword))
            self.findings_table.setItem(row, 6, QTableWidgetItem(fnd.source))
            self.findings_table.setItem(row, 7, QTableWidgetItem(fnd.message))

    def apply_log_filter(self):
            if not self.log_findings:
                return
            sev = self.filter_sev.currentText()
            cat = self.filter_cat.currentText()
            atk = self.filter_atk.currentText()
            kw = self.filter_kw.text().strip().lower()

            out = []
            for f in self.log_findings:
                if sev != "全部" and f.severity != sev:
                    continue
                if cat != "全部" and f.category != cat:
                    continue
                if atk != "全部" and (f.attack_type or "") != atk:
                    continue
                if kw:
                    hay = " ".join([f.keyword or "", f.message or "", f.source or "", f.timestamp or "", getattr(f, "attack_ip", "") or ""]).lower()
                    if kw not in hay:
                        continue
                out.append(f)

            self.refresh_findings_table(out)
            self.log_stats_box.append(f"[过滤] 命中 {len(out)}/{len(self.log_findings)} 条")

    def reset_log_filter(self):
        self.filter_sev.setCurrentIndex(0)
        self.filter_cat.setCurrentIndex(0)
        self.filter_atk.setCurrentIndex(0)
        self.filter_kw.clear()
        self.refresh_findings_table(self.log_findings)

def main():
    app = QApplication(sys.argv)
    win = MainWindow()
    win.show()
    sys.exit(app.exec())

if __name__ == "__main__":
    main()
</code></pre>]]></description>
    <pubDate>Mon, 02 Feb 2026 17:28:26 +0800</pubDate>
    <dc:creator>‎刘小猪</dc:creator>
    <guid>https://blog.liuxiaozhu.cn/?post=7</guid>
</item>
<item>
    <title>Python脚本-Unicode与中文互转工具</title>
    <link>https://blog.liuxiaozhu.cn/?post=6</link>
    <description><![CDATA[<h1>Python脚本-Unicode与中文互转工具</h1>
<pre><code class="language-python">import tkinter as tk
from tkinter import ttk, scrolledtext, filedialog, messagebox
import re

def unicode_to_chinese(unicode_str):
    """将Unicode编码转换为中文"""
    try:
        # 预处理：尝试提取所有可能的Unicode编码
        import re

        # 结果字符串
        result = ""

        # 使用正则表达式匹配多种Unicode编码格式
        # 1. \uXXXX 格式
        # 2. &amp;#xXXXX; 格式
        # 3. U+XXXX 格式
        patterns = [
            r'\\u([0-9a-fA-F]{4})',           # \uXXXX
            r'&amp;#x([0-9a-fA-F]{1,6});',        # &amp;#xXXXX;
            r'[uU]\+([0-9a-fA-F]{4,6})'       # U+XXXX
        ]

        # 合并所有匹配项
        combined_pattern = '|'.join(patterns)
        matches = re.finditer(combined_pattern, unicode_str)
        last_end = 0
        found_match = False

        for match in matches:
            found_match = True
            # 添加匹配之前的非Unicode部分
            result += unicode_str[last_end:match.start()]

            # 确定匹配的是哪种格式
            if match.group(1):  # \uXXXX
                hex_code = match.group(1)
            elif match.group(2):  # &amp;#xXXXX;
                hex_code = match.group(2)
            elif match.group(3):  # U+XXXX
                hex_code = match.group(3)
            else:
                continue

            try:
                char_code = int(hex_code, 16)
                if 0 &lt;= char_code &lt;= 0x10FFFF:  # 有效的Unicode范围
                    result += chr(char_code)
                else:
                    result += f"[无效Unicode:{hex_code}]"
            except (ValueError, OverflowError):
                result += f"[无法转换:{hex_code}]"

            last_end = match.end()

        # 添加最后一个匹配之后的部分
        result += unicode_str[last_end:]

        # 如果没有找到任何匹配，尝试使用原始方法
        if not found_match:
            # 使用原始的分割和处理方法作为备选
            # ... 原有代码逻辑 ...
            parts = []
            current_part = ""

            for char in unicode_str:
                if char.isalnum() or char in '\\uU':
                    current_part += char
                else:
                    if current_part:
                        parts.append(current_part)
                        current_part = ""
                    # 对于非字母数字和Unicode标识符的字符，直接添加到结果中
                    if char not in ' \t\n\r':
                        result += char

            if current_part:
                parts.append(current_part)

            result = ""  # 重置结果
            for part in parts:
                # 移除可能存在的'u'或'U'前缀和多余符号
                part = part.strip()
                if part.startswith(('\\u', '\\U')):
                    part = part[2:]  # 移除'\u'
                elif part.startswith(('u', 'U')):
                    part = part[1:]  # 移除'u'

                # 确保是有效的十六进制字符串
                try:
                    # 将十六进制转换为整数，然后转换为字符
                    if part and all(c in '0123456789abcdefABCDEF' for c in part):
                        # 添加长度检查，防止过大的值
                        if len(part) &lt;= 8:  # 通常Unicode码点不会超过8位十六进制
                            char_code = int(part, 16)
                            # 添加范围检查，确保码点在有效范围内
                            if 0 &lt;= char_code &lt;= 0x10FFFF:  # Unicode的有效范围
                                result += chr(char_code)
                            else:
                                result += f"[无效Unicode:{part}]"
                        else:
                            result += f"[Unicode过长:{part}]"
                    elif part:  # 如果部分不是有效的十六进制，但不为空
                        result += part  # 直接添加到结果中
                except (ValueError, OverflowError):
                    # 如果转换失败，直接保留原始文本
                    result += f"[无法转换:{part}]"

        return result
    except Exception as e:
        return f"转换错误: {str(e)}"

def chinese_to_unicode(chinese_str, format_type="compact"):
    """将中文转换为Unicode编码，非中文字符保持原样"""
    try:
        result = ""
        for char in chinese_str:
            # 判断是否为中文字符
            # 中文字符的Unicode范围大致为：\u4e00-\u9fff
            if '\u4e00' &lt;= char &lt;= '\u9fff' or '\u3400' &lt;= char &lt;= '\u4dbf' or '\uf900' &lt;= char &lt;= '\ufaff':
                if format_type == "compact":
                    unicode_code = f"\\u{ord(char):04x}"
                    result += unicode_code
                elif format_type == "spaced":
                    unicode_code = f"\\u{ord(char):04x}"
                    result += unicode_code + " "
                elif format_type == "u_prefix":
                    unicode_code = f"U+{ord(char):04X}"
                    result += unicode_code + " "
                elif format_type == "html":
                    unicode_code = f"&amp;#x{ord(char):04x};"
                    result += unicode_code
                else:  # 默认使用紧凑格式
                    unicode_code = f"\\u{ord(char):04x}"
                    result += unicode_code
            else:
                # 非中文字符保持原样
                result += char

        return result.strip()
    except Exception as e:
        return f"转换错误: {str(e)}"

# 批量处理函数
def batch_process(file_path, output_path, conversion_func, format_type=None):
    """批量处理文件"""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()

        # 根据转换类型处理内容
        if conversion_func == unicode_to_chinese:
            result = unicode_to_chinese(content)
        else:
            result = chinese_to_unicode(content, format_type)

        # 写入输出文件
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(result)

        return True, f"处理完成，已保存到 {output_path}"
    except Exception as e:
        return False, f"处理失败: {str(e)}"

# 添加历史记录列表
history_records = []
max_history = 10  # 最大历史记录数量

def convert():
    """根据选择执行转换操作"""
    input_text = input_area.get("1.0", tk.END).strip()
    if not input_text:
        output_area.delete("1.0", tk.END)
        output_area.insert(tk.END, "请输入需要转换的文本")
        return

    conversion_type = conversion_var.get()
    if conversion_type == "Unicode转中文":
        result = unicode_to_chinese(input_text)
    else:  # 中文转Unicode
        format_type = format_var.get()  # 获取格式类型
        result = chinese_to_unicode(input_text, format_type)

    output_area.delete("1.0", tk.END)
    output_area.insert(tk.END, result)

    # 添加到历史记录
    add_to_history(conversion_type, input_text, result)
    update_history_display()

def add_to_history(conversion_type, input_text, result):
    """添加记录到历史列表"""
    # 添加时间戳
    import datetime
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    # 创建历史记录项 - 保存完整内容
    history_item = {
        "timestamp": timestamp,
        "type": conversion_type,
        "input": input_text,
        "output": result,
        "input_display": input_text[:47] + "..." if len(input_text) &gt; 50 else input_text,
        "output_display": result[:47] + "..." if len(result) &gt; 50 else result
    }

    # 添加到历史记录列表
    history_records.insert(0, history_item)  # 新记录插入到列表开头

    # 限制历史记录数量
    if len(history_records) &gt; max_history:
        history_records.pop()  # 移除最旧的记录

def update_history_display():
    """更新历史记录显示"""
    history_listbox.delete(0, tk.END)  # 清空列表

    for item in history_records:
        # 使用截断后的显示文本
        display_text = f"[{item['timestamp']}] {item['type']}: {item['input_display']} → {item['output_display']}"
        history_listbox.insert(tk.END, display_text)

def on_history_select(event):
    """当用户选择历史记录时触发"""
    selection = history_listbox.curselection()
    if not selection:
        return

    # 获取选中的历史记录
    index = selection[0]
    if 0 &lt;= index &lt; len(history_records):
        selected_history = history_records[index]

        # 将历史记录的完整输入填充到输入区域
        input_area.delete("1.0", tk.END)
        input_area.insert(tk.END, selected_history["input"])

        # 将历史记录的完整输出填充到输出区域
        output_area.delete("1.0", tk.END)
        output_area.insert(tk.END, selected_history["output"])

        # 设置转换类型
        conversion_var.set(selected_history["type"])

def copy_to_clipboard():
    """复制输出结果到剪贴板"""
    result = output_area.get("1.0", tk.END).strip()
    root.clipboard_clear()
    root.clipboard_append(result)
    status_label.config(text="已复制到剪贴板")
    # 2秒后清除状态信息
    root.after(2000, lambda: status_label.config(text=""))

def clear_text():
    """清空输入和输出区域"""
    input_area.delete("1.0", tk.END)
    output_area.delete("1.0", tk.END)
    status_label.config(text="")

def open_batch_window():
    """打开批量处理窗口"""
    batch_window = tk.Toplevel(root)
    batch_window.title("批量处理")
    batch_window.geometry("500x300")
    batch_window.resizable(True, True)

    # 设置为模态窗口
    batch_window.transient(root)
    batch_window.grab_set()

    # 创建主框架
    batch_frame = ttk.Frame(batch_window, padding="10")
    batch_frame.pack(fill=tk.BOTH, expand=True)

    # 转换类型选择
    batch_conversion_var = tk.StringVar(value="Unicode转中文")
    conversion_frame = ttk.Frame(batch_frame)
    conversion_frame.pack(fill=tk.X, pady=5)

    ttk.Label(conversion_frame, text="转换类型:").pack(side=tk.LEFT)
    ttk.Radiobutton(conversion_frame, text="Unicode转中文", variable=batch_conversion_var, value="Unicode转中文").pack(side=tk.LEFT, padx=5)
    ttk.Radiobutton(conversion_frame, text="中文转Unicode", variable=batch_conversion_var, value="中文转Unicode").pack(side=tk.LEFT, padx=5)

    # 格式选择（仅在中文转Unicode时有效）
    batch_format_var = tk.StringVar(value="compact")
    format_frame = ttk.Frame(batch_frame)
    format_frame.pack(fill=tk.X, pady=5)

    ttk.Label(format_frame, text="Unicode格式:").pack(side=tk.LEFT)
    ttk.Radiobutton(format_frame, text="紧凑格式(\\uXXXX)", variable=batch_format_var, value="compact").pack(side=tk.LEFT, padx=5)
    ttk.Radiobutton(format_frame, text="空格分隔(\\uXXXX )", variable=batch_format_var, value="spaced").pack(side=tk.LEFT, padx=5)
    ttk.Radiobutton(format_frame, text="U+格式(U+XXXX)", variable=batch_format_var, value="u_prefix").pack(side=tk.LEFT, padx=5)
    ttk.Radiobutton(format_frame, text="HTML格式(&amp;#xXXXX;)", variable=batch_format_var, value="html").pack(side=tk.LEFT, padx=5)

    # 文件选择
    file_frame = ttk.Frame(batch_frame)
    file_frame.pack(fill=tk.X, pady=5)

    input_file_var = tk.StringVar()
    ttk.Label(file_frame, text="输入文件:").pack(side=tk.LEFT)
    ttk.Entry(file_frame, textvariable=input_file_var, width=40).pack(side=tk.LEFT, padx=5, fill=tk.X, expand=True)

    def select_input_file():
        file_path = filedialog.askopenfilename(filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")])
        if file_path:
            input_file_var.set(file_path)
            # 自动设置输出文件路径
            if not output_file_var.get():
                import os
                base, ext = os.path.splitext(file_path)
                output_file_var.set(f"{base}_converted{ext}")

    ttk.Button(file_frame, text="浏览...", command=select_input_file).pack(side=tk.LEFT)

    # 输出文件
    output_frame = ttk.Frame(batch_frame)
    output_frame.pack(fill=tk.X, pady=5)

    output_file_var = tk.StringVar()
    ttk.Label(output_frame, text="输出文件:").pack(side=tk.LEFT)
    ttk.Entry(output_frame, textvariable=output_file_var, width=40).pack(side=tk.LEFT, padx=5, fill=tk.X, expand=True)

    def select_output_file():
        file_path = filedialog.asksaveasfilename(filetypes=[("文本文件", "*.txt"), ("所有文件", "*.*")])
        if file_path:
            output_file_var.set(file_path)

    ttk.Button(output_frame, text="浏览...", command=select_output_file).pack(side=tk.LEFT)

    # 处理按钮
    button_frame = ttk.Frame(batch_frame)
    button_frame.pack(fill=tk.X, pady=10)

    def start_batch_process():
        input_file = input_file_var.get()
        output_file = output_file_var.get()

        if not input_file or not output_file:
            messagebox.showerror("错误", "请选择输入和输出文件")
            return

        conversion_type = batch_conversion_var.get()
        if conversion_type == "Unicode转中文":
            success, message = batch_process(input_file, output_file, unicode_to_chinese)
        else:
            format_type = batch_format_var.get()
            success, message = batch_process(input_file, output_file, chinese_to_unicode, format_type)

        if success:
            messagebox.showinfo("成功", message)
            batch_window.destroy()
        else:
            messagebox.showerror("错误", message)

    ttk.Button(button_frame, text="开始处理", command=start_batch_process).pack(side=tk.LEFT, padx=5)
    ttk.Button(button_frame, text="取消", command=batch_window.destroy).pack(side=tk.LEFT, padx=5)

    # 状态标签
    status_label = ttk.Label(batch_frame, text="", foreground="green")
    status_label.pack(anchor=tk.W, pady=5)

# 创建主窗口
root = tk.Tk()
root.title("Unicode与中文互转工具")
root.geometry("800x600")  # 增加窗口大小
root.resizable(True, True)

# 设置样式
style = ttk.Style()
style.configure("TButton", padding=6, relief="flat", background="#ccc")
style.configure("TFrame", background="#f0f0f0")
style.configure("TLabelframe", background="#f0f0f0")
style.configure("TLabelframe.Label", background="#f0f0f0")

# 创建主框架
main_frame = ttk.Frame(root, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)

# 创建顶部工具栏
toolbar_frame = ttk.Frame(main_frame)
toolbar_frame.pack(fill=tk.X, pady=5)

# 转换类型选择
conversion_var = tk.StringVar(value="Unicode转中文")
ttk.Label(toolbar_frame, text="转换类型:").pack(side=tk.LEFT)
ttk.Radiobutton(toolbar_frame, text="Unicode转中文", variable=conversion_var, value="Unicode转中文").pack(side=tk.LEFT, padx=5)
ttk.Radiobutton(toolbar_frame, text="中文转Unicode", variable=conversion_var, value="中文转Unicode").pack(side=tk.LEFT, padx=5)

# 添加批量处理按钮
ttk.Button(toolbar_frame, text="批量处理", command=open_batch_window).pack(side=tk.RIGHT, padx=5)

# 格式选择
format_var = tk.StringVar(value="compact")
format_frame = ttk.Frame(main_frame)
format_frame.pack(fill=tk.X, pady=5)

ttk.Label(format_frame, text="Unicode格式:").pack(side=tk.LEFT)
ttk.Radiobutton(format_frame, text="紧凑格式(\\uXXXX)", variable=format_var, value="compact").pack(side=tk.LEFT, padx=5)
ttk.Radiobutton(format_frame, text="空格分隔(\\uXXXX )", variable=format_var, value="spaced").pack(side=tk.LEFT, padx=5)
ttk.Radiobutton(format_frame, text="U+格式(U+XXXX)", variable=format_var, value="u_prefix").pack(side=tk.LEFT, padx=5)
ttk.Radiobutton(format_frame, text="HTML格式(&amp;#xXXXX;)", variable=format_var, value="html").pack(side=tk.LEFT, padx=5)

# 创建左右分栏
paned_window = ttk.PanedWindow(main_frame, orient=tk.HORIZONTAL)
paned_window.pack(fill=tk.BOTH, expand=True, pady=5)

# 左侧面板 - 输入和输出
left_panel = ttk.Frame(paned_window)
paned_window.add(left_panel, weight=2)

# 输入区域
input_frame = ttk.LabelFrame(left_panel, text="输入")
input_frame.pack(fill=tk.BOTH, expand=True, pady=5)

input_area = scrolledtext.ScrolledText(input_frame, wrap=tk.WORD, height=8)
input_area.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)

# 按钮区域
button_frame = ttk.Frame(left_panel)
button_frame.pack(fill=tk.X, pady=5)

convert_button = ttk.Button(button_frame, text="转换", command=convert)
convert_button.pack(side=tk.LEFT, padx=5)

copy_button = ttk.Button(button_frame, text="复制结果", command=copy_to_clipboard)
copy_button.pack(side=tk.LEFT, padx=5)

clear_button = ttk.Button(button_frame, text="清空", command=clear_text)
clear_button.pack(side=tk.LEFT, padx=5)

# 输出区域
output_frame = ttk.LabelFrame(left_panel, text="输出")
output_frame.pack(fill=tk.BOTH, expand=True, pady=5)

output_area = scrolledtext.ScrolledText(output_frame, wrap=tk.WORD, height=8)
output_area.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)

# 右侧面板 - 历史记录
right_panel = ttk.Frame(paned_window)
paned_window.add(right_panel, weight=1)

# 历史记录区域
history_frame = ttk.LabelFrame(right_panel, text="历史记录")
history_frame.pack(fill=tk.BOTH, expand=True, pady=5)

# 创建历史记录列表框和滚动条
history_scrollbar = ttk.Scrollbar(history_frame)
history_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)

history_listbox = tk.Listbox(history_frame, yscrollcommand=history_scrollbar.set)
history_listbox.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
history_listbox.bind('&lt;&lt;ListboxSelect&gt;&gt;', on_history_select)

history_scrollbar.config(command=history_listbox.yview)

# 状态标签
status_label = ttk.Label(main_frame, text="", foreground="green")
status_label.pack(anchor=tk.W, pady=5)

# 启动应用
root.mainloop()</code></pre>]]></description>
    <pubDate>Tue, 28 Oct 2025 15:33:44 +0800</pubDate>
    <dc:creator>‎刘小猪</dc:creator>
    <guid>https://blog.liuxiaozhu.cn/?post=6</guid>
</item>
<item>
    <title>Python脚本-Mysql数据库同步到Oracle数据库</title>
    <link>https://blog.liuxiaozhu.cn/?post=5</link>
    <description><![CDATA[<h2>主程序代码</h2>
<pre><code class="language-python"># ===== 防止 PyInstaller 漏依赖 =====
import getpass
import secrets
import asyncio

import uuid
import datetime
import pymysql
import oracledb
import yaml
import os
import logging
import traceback
import csv

# ======================
# 日志配置
# ======================
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("sync.log", encoding="utf-8"),
        logging.StreamHandler()
    ]
)

# ======================
# 读取配置文件
# ======================
def load_config(config_file="config.yaml"):
    if not os.path.exists(config_file):
        raise FileNotFoundError(f"配置文件 {config_file} 不存在！")
    with open(config_file, "r", encoding="utf-8") as f:
        return yaml.safe_load(f)

# ======================
# MySQL 连接
# ======================
def get_mysql_connection(cfg):
    return pymysql.connect(
        host=cfg["host"],
        port=cfg["port"],
        user=cfg["user"],
        password=cfg["password"],
        database=cfg["database"],
        charset=cfg["charset"],
        cursorclass=pymysql.cursors.DictCursor
    )

# ======================
# Oracle 连接（自动切换 thin/thick + 自动去掉 encoding）
# ======================
def get_oracle_connection(cfg):
    instant_client_dir = cfg.get("instant_client_dir")
    try:
        if instant_client_dir and os.path.exists(instant_client_dir):
            oracledb.init_oracle_client(lib_dir=instant_client_dir, driver_name="oracledb-thick")
            logging.info(f"Oracle 使用 Thick 模式 (Instant Client: {instant_client_dir})")
        else:
            logging.info("未找到 Instant Client，使用 Thin 模式 (仅支持 12c+)")
    except Exception as e:
        logging.warning(f"初始化 Thick 模式失败，改用 Thin 模式: {e}")

    user = cfg.get("user")
    password = cfg.get("password")
    dsn = cfg.get("dsn")
    encoding = cfg.get("encoding", "UTF-8")

    try:
        return oracledb.connect(user=user, password=password, dsn=dsn, encoding=encoding)
    except TypeError:
        logging.warning("⚠️ 当前接口不支持 encoding 参数，自动去掉重试")
        return oracledb.connect(user=user, password=password, dsn=dsn)

# ======================
# 源 SQL
# ======================
MYSQL_SQL = """
SELECT
    temp.ID,
    temp.RowGuid,
    temp.ItemCode,
    temp.TaskCode,
    temp.TaskHandleItem,
    temp.TaskName,
    temp.TaskType,
    temp.TaskVersion,
    temp.ProjectType,
    temp.AreaCode,
    temp.DeptCode,
    temp.DeptName,
    temp.Cd_operation,
    temp.Cd_time,
    SYSDATE() createTime,
    temp.TaskState,
    temp.isSecondDept,
    temp.isTaskHandleItem,
    case when temp.is_use1 = '0' then '0' when temp.is_use2 = '0' then '0' else '1' end is_use,
    '1' trueValue,
    '0' falseValue
FROM
    (
SELECT
    ID,
    RowGuid,
    TaskCode ItemCode,
    TaskCode,
    TaskHandleItem,
    TaskName,
    TaskType,
    TaskVersion,
    ProjectType,
    AreaCode,
    case when secondDeptCode is null then DeptCode else secondDeptCode end DeptCode,
    DeptName,
    Cd_operation,
    Cd_time,
    TaskState,
    case when secondDeptCode is null then '1' else '0' end isSecondDept,
    '0' isTaskHandleItem,
    case when TaskState in ('2','3') then '0' else '1' end is_use1,
    case when Cd_operation = 'D' then '0' else '1' end is_use2
FROM
    up_task_general_basic 
WHERE
    TaskHandleItem IS NULL 
UNION
SELECT
    ID,
    RowGuid,
    TaskCode ItemCode,
    TaskCode,
    TaskHandleItem,
    TaskName,
    TaskType,
    TaskVersion,
    ProjectType,
    AreaCode,
    case when secondDeptCode is null then DeptCode else secondDeptCode end DeptCode,
    DeptName,
    Cd_operation,
    Cd_time,
    TaskState,
    case when secondDeptCode is null then '1' else '0' end isSecondDept,
    '0' isTaskHandleItem,
    case when TaskState in ('2','3') then '0' else '1' end is_use1,
    case when Cd_operation = 'D' then '0' else '1' end is_use2
FROM
    up_task_public_basic 
WHERE
    TaskHandleItem IS NULL 
UNION
SELECT
    ID,
    RowGuid,
    TaskCode ItemCode,
    TaskCode,
    TaskHandleItem,
    TaskName,
    TaskType,
    TaskVersion,
    ProjectType,
    AreaCode,
    case when secondDeptCode is null then DeptCode else secondDeptCode end DeptCode,
    DeptName,
    Cd_operation,
    Cd_time,
    TaskState,
    case when secondDeptCode is null then '1' else '0' end isSecondDept,
    '0' isTaskHandleItem,
    case when TaskState in ('2','3') then '0' else '1' end is_use1,
    case when Cd_operation = 'D' then '0' else '1' end is_use2
FROM
    up_task_huiqi_basic 
WHERE
    TaskHandleItem IS NULL 
UNION
SELECT
    ID,
    RowGuid,
    TaskHandleItem ItemCode,
    TaskCode,
    TaskHandleItem,
    TaskName,
    TaskType,
    TaskVersion,
    ProjectType,
    AreaCode,
    case when secondDeptCode is null then DeptCode else secondDeptCode end DeptCode,
    DeptName,
    Cd_operation,
    Cd_time,
    TaskState,
    case when secondDeptCode is null then '1' else '0' end isSecondDept,
    '1' isTaskHandleItem,
    case when TaskState in ('2','3') then '0' else '1' end is_use1,
    case when Cd_operation = 'D' then '0' else '1' end is_use2
FROM
    up_task_general_basic 
WHERE
    TaskHandleItem IS NOT NULL 
UNION
SELECT
    ID,
    RowGuid,
    TaskHandleItem ItemCode,
    TaskCode,
    TaskHandleItem,
    TaskName,
    TaskType,
    TaskVersion,
    ProjectType,
    AreaCode,
    case when secondDeptCode is null then DeptCode else secondDeptCode end DeptCode,
    DeptName,
    Cd_operation,
    Cd_time,
    TaskState,
    case when secondDeptCode is null then '1' else '0' end isSecondDept,
    '1' isTaskHandleItem,
    case when TaskState in ('2','3') then '0' else '1' end is_use1,
    case when Cd_operation = 'D' then '0' else '1' end is_use2
FROM
    up_task_public_basic 
WHERE
    TaskHandleItem IS NOT NULL 
UNION
SELECT
    ID,
    RowGuid,
    TaskHandleItem ItemCode,
    TaskCode,
    TaskHandleItem,
    TaskName,
    TaskType,
    TaskVersion,
    ProjectType,
    AreaCode,
    case when secondDeptCode is null then DeptCode else secondDeptCode end DeptCode,
    DeptName,
    Cd_operation,
    Cd_time,
    TaskState,
    case when secondDeptCode is null then '1' else '0' end isSecondDept,
    '1' isTaskHandleItem,
    case when TaskState in ('2','3') then '0' else '1' end is_use1,
    case when Cd_operation = 'D' then '0' else '1' end is_use2
FROM
    up_task_huiqi_basic 
WHERE
    TaskHandleItem IS NOT NULL 
    ) temp
WHERE temp.Cd_time &gt;= %s
ORDER BY
    temp.Cd_time ASC
"""

# ======================
# 主逻辑
# ======================
def sync_data(config, batch_size=1000):
    MYSQL_CONFIG = config["mysql"]
    ORACLE_CONFIG = config["oracle"]
    sync_date = config["sync"]["start_date"]

    # 连接 MySQL
    try:
        mysql_conn = get_mysql_connection(MYSQL_CONFIG)
        mysql_cursor = mysql_conn.cursor()
        mysql_cursor.execute(MYSQL_SQL, (sync_date,))
        rows = mysql_cursor.fetchall()
        total_rows = len(rows)
        logging.info(f"MySQL 查询成功，共 {total_rows} 条数据")
    except Exception as e:
        logging.error(f"MySQL 连接或查询失败: {e}")
        logging.error(traceback.format_exc())
        return

    # 连接 Oracle
    try:
        oracle_conn = get_oracle_connection(ORACLE_CONFIG)
        oracle_cursor = oracle_conn.cursor()
    except Exception as e:
        logging.error(f"Oracle 连接失败: {e}")
        logging.error(traceback.format_exc())
        return

    inserted = updated = errors = processed = 0
    error_records = []

    while processed &lt; total_rows:
        batch = rows[processed:processed + batch_size]
        if not batch:
            break

        for row in batch:
            processed += 1
            new_uuid = str(uuid.uuid4())
            create_time = datetime.datetime.now()
            item_code = row.get("ItemCode")
            task_name = row.get("TaskName")

            try:
                # 判断是否存在
                oracle_cursor.execute(
                    "SELECT COUNT(*) FROM ACCEPT_ITEM_TRANSIT WHERE ITEM_CODE = :item_code",
                    [item_code]
                )
                exists = oracle_cursor.fetchone()[0]

                if exists:
                    oracle_cursor.execute("""
                        UPDATE ACCEPT_ITEM_TRANSIT
                        SET ITEM_NAME = :task_name,
                            TYPE = :task_type,
                            VERSION = :task_version,
                            REGION_CODE = :area_code,
                            ORGAN_CODE = :dept_code,
                            ORGAN_NAME = :dept_name,
                            UPDATE_TIME = :update_time,
                            ASSORT = :project_type,
                            IS_USE = :is_use,
                            TASK_CODE = :task_code,
                            IS_HANDLE_ITEM = :is_handle_item
                        WHERE ITEM_CODE = :item_code
                    """, {
                        "task_name": row["TaskName"],
                        "task_type": row["TaskType"],
                        "task_version": row["TaskVersion"],
                        "area_code": row["AreaCode"],
                        "dept_code": row["DeptCode"],
                        "dept_name": row["DeptName"],
                        "update_time": row["Cd_time"],
                        "project_type": row["ProjectType"],
                        "is_use": row["is_use"],
                        "task_code": row["TaskCode"],
                        "is_handle_item": row["isTaskHandleItem"],
                        "item_code": item_code
                    })
                    updated += 1
                else:
                    oracle_cursor.execute("""
                        INSERT INTO ACCEPT_ITEM_TRANSIT (
                            ID, ITEM_ID, ITEM_CODE, ITEM_NAME, TYPE,
                            VERSION, REGION_CODE, ORGAN_CODE, ORGAN_NAME,
                            UPDATE_TIME, CREATE_TIME, ASSORT, IS_USE, TASK_CODE, IS_HANDLE_ITEM
                        ) VALUES (
                            :id, :item_id, :item_code, :task_name, :task_type,
                            :task_version, :area_code, :dept_code, :dept_name,
                            :update_time, :create_time, :project_type, :is_use, :task_code, :is_handle_item
                        )
                    """, {
                        "id": new_uuid,
                        "item_id": row["RowGuid"],
                        "item_code": item_code,
                        "task_name": row["TaskName"],
                        "task_type": row["TaskType"],
                        "task_version": row["TaskVersion"],
                        "area_code": row["AreaCode"],
                        "dept_code": row["DeptCode"],
                        "dept_name": row["DeptName"],
                        "update_time": row["Cd_time"],
                        "create_time": create_time,
                        "project_type": row["ProjectType"],
                        "is_use": row["is_use"],
                        "task_code": row["TaskCode"],
                        "is_handle_item": row["isTaskHandleItem"]
                    })
                    inserted += 1

            except Exception as e:
                errors += 1
                logging.error(f"同步失败: ITEM_CODE={item_code}, 错误={e}")
                logging.error(traceback.format_exc())
                error_records.append({"ItemCode": item_code, "TaskName": task_name, "Error": str(e)})

        oracle_conn.commit()
        percent = processed / total_rows * 100
        logging.info(
            f"批次完成: 已处理 {processed}/{total_rows} ({percent:.2f}%) "
            f"新增 {inserted}, 更新 {updated}, 出错 {errors}"
        )

    mysql_cursor.close()
    mysql_conn.close()
    oracle_cursor.close()
    oracle_conn.close()

    if error_records:
        ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"error_records_{ts}.csv"
        with open(filename, "w", newline="", encoding="utf-8-sig") as f:
            writer = csv.DictWriter(f, fieldnames=["ItemCode", "TaskName", "Error"])
            writer.writeheader()
            writer.writerows(error_records)
        logging.info(f"错误记录已导出到 {filename}，共 {len(error_records)} 条")

    logging.info("=" * 60)
    logging.info(f"同步完成: 新增 {inserted}, 更新 {updated}, 出错 {errors}, 总计 {processed} 条")
    logging.info("=" * 60)

if __name__ == "__main__":
    config = load_config("config.yaml")
    sync_data(config, batch_size=1000)</code></pre>
<h2>配置文件</h2>
<pre><code class="language-python">mysql:
  host: "192.168.1.10"
  port: 3306
  user: "ceshi"
  password: "ceshi@123"
  database: "ceshi"
  charset: "utf8mb4"

oracle:
  user: "ceshi"
  password: "ceshi@123"
  dsn: "192.168.1.11:1521/nndb"   # ORCL 替换成实际服务名

sync:
  start_date: "2025-10-23"   # 同步的起始时间</code></pre>]]></description>
    <pubDate>Thu, 23 Oct 2025 16:14:51 +0800</pubDate>
    <dc:creator>‎刘小猪</dc:creator>
    <guid>https://blog.liuxiaozhu.cn/?post=5</guid>
</item>
</channel>
</rss>