«

Python数据同步脚本-Mysql数据库同步到Oracle数据库

‎刘小猪 发布于 阅读:8 教程


主程序代码

# ===== 防止 PyInstaller 漏依赖 =====
import getpass
import secrets
import asyncio

import uuid
import datetime
import pymysql
import oracledb
import yaml
import os
import logging
import traceback
import csv

# ======================
# 日志配置
# ======================
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("sync.log", encoding="utf-8"),
        logging.StreamHandler()
    ]
)

# ======================
# 读取配置文件
# ======================
def load_config(config_file="config.yaml"):
    if not os.path.exists(config_file):
        raise FileNotFoundError(f"配置文件 {config_file} 不存在!")
    with open(config_file, "r", encoding="utf-8") as f:
        return yaml.safe_load(f)

# ======================
# MySQL 连接
# ======================
def get_mysql_connection(cfg):
    return pymysql.connect(
        host=cfg["host"],
        port=cfg["port"],
        user=cfg["user"],
        password=cfg["password"],
        database=cfg["database"],
        charset=cfg["charset"],
        cursorclass=pymysql.cursors.DictCursor
    )

# ======================
# Oracle 连接(自动切换 thin/thick + 自动去掉 encoding)
# ======================
def get_oracle_connection(cfg):
    instant_client_dir = cfg.get("instant_client_dir")
    try:
        if instant_client_dir and os.path.exists(instant_client_dir):
            oracledb.init_oracle_client(lib_dir=instant_client_dir, driver_name="oracledb-thick")
            logging.info(f"Oracle 使用 Thick 模式 (Instant Client: {instant_client_dir})")
        else:
            logging.info("未找到 Instant Client,使用 Thin 模式 (仅支持 12c+)")
    except Exception as e:
        logging.warning(f"初始化 Thick 模式失败,改用 Thin 模式: {e}")

    user = cfg.get("user")
    password = cfg.get("password")
    dsn = cfg.get("dsn")
    encoding = cfg.get("encoding", "UTF-8")

    try:
        return oracledb.connect(user=user, password=password, dsn=dsn, encoding=encoding)
    except TypeError:
        logging.warning("⚠️ 当前接口不支持 encoding 参数,自动去掉重试")
        return oracledb.connect(user=user, password=password, dsn=dsn)

# ======================
# 源 SQL
# ======================
MYSQL_SQL = """
SELECT
    temp.ID,
    temp.RowGuid,
    temp.ItemCode,
    temp.TaskCode,
    temp.TaskHandleItem,
    temp.TaskName,
    temp.TaskType,
    temp.TaskVersion,
    temp.ProjectType,
    temp.AreaCode,
    temp.DeptCode,
    temp.DeptName,
    temp.Cd_operation,
    temp.Cd_time,
    SYSDATE() createTime,
    temp.TaskState,
    temp.isSecondDept,
    temp.isTaskHandleItem,
    case when temp.is_use1 = '0' then '0' when temp.is_use2 = '0' then '0' else '1' end is_use,
    '1' trueValue,
    '0' falseValue
FROM
    (
SELECT
    ID,
    RowGuid,
    TaskCode ItemCode,
    TaskCode,
    TaskHandleItem,
    TaskName,
    TaskType,
    TaskVersion,
    ProjectType,
    AreaCode,
    case when secondDeptCode is null then DeptCode else secondDeptCode end DeptCode,
    DeptName,
    Cd_operation,
    Cd_time,
    TaskState,
    case when secondDeptCode is null then '1' else '0' end isSecondDept,
    '0' isTaskHandleItem,
    case when TaskState in ('2','3') then '0' else '1' end is_use1,
    case when Cd_operation = 'D' then '0' else '1' end is_use2
FROM
    up_task_general_basic 
WHERE
    TaskHandleItem IS NULL 
UNION
SELECT
    ID,
    RowGuid,
    TaskCode ItemCode,
    TaskCode,
    TaskHandleItem,
    TaskName,
    TaskType,
    TaskVersion,
    ProjectType,
    AreaCode,
    case when secondDeptCode is null then DeptCode else secondDeptCode end DeptCode,
    DeptName,
    Cd_operation,
    Cd_time,
    TaskState,
    case when secondDeptCode is null then '1' else '0' end isSecondDept,
    '0' isTaskHandleItem,
    case when TaskState in ('2','3') then '0' else '1' end is_use1,
    case when Cd_operation = 'D' then '0' else '1' end is_use2
FROM
    up_task_public_basic 
WHERE
    TaskHandleItem IS NULL 
UNION
SELECT
    ID,
    RowGuid,
    TaskCode ItemCode,
    TaskCode,
    TaskHandleItem,
    TaskName,
    TaskType,
    TaskVersion,
    ProjectType,
    AreaCode,
    case when secondDeptCode is null then DeptCode else secondDeptCode end DeptCode,
    DeptName,
    Cd_operation,
    Cd_time,
    TaskState,
    case when secondDeptCode is null then '1' else '0' end isSecondDept,
    '0' isTaskHandleItem,
    case when TaskState in ('2','3') then '0' else '1' end is_use1,
    case when Cd_operation = 'D' then '0' else '1' end is_use2
FROM
    up_task_huiqi_basic 
WHERE
    TaskHandleItem IS NULL 
UNION
SELECT
    ID,
    RowGuid,
    TaskHandleItem ItemCode,
    TaskCode,
    TaskHandleItem,
    TaskName,
    TaskType,
    TaskVersion,
    ProjectType,
    AreaCode,
    case when secondDeptCode is null then DeptCode else secondDeptCode end DeptCode,
    DeptName,
    Cd_operation,
    Cd_time,
    TaskState,
    case when secondDeptCode is null then '1' else '0' end isSecondDept,
    '1' isTaskHandleItem,
    case when TaskState in ('2','3') then '0' else '1' end is_use1,
    case when Cd_operation = 'D' then '0' else '1' end is_use2
FROM
    up_task_general_basic 
WHERE
    TaskHandleItem IS NOT NULL 
UNION
SELECT
    ID,
    RowGuid,
    TaskHandleItem ItemCode,
    TaskCode,
    TaskHandleItem,
    TaskName,
    TaskType,
    TaskVersion,
    ProjectType,
    AreaCode,
    case when secondDeptCode is null then DeptCode else secondDeptCode end DeptCode,
    DeptName,
    Cd_operation,
    Cd_time,
    TaskState,
    case when secondDeptCode is null then '1' else '0' end isSecondDept,
    '1' isTaskHandleItem,
    case when TaskState in ('2','3') then '0' else '1' end is_use1,
    case when Cd_operation = 'D' then '0' else '1' end is_use2
FROM
    up_task_public_basic 
WHERE
    TaskHandleItem IS NOT NULL 
UNION
SELECT
    ID,
    RowGuid,
    TaskHandleItem ItemCode,
    TaskCode,
    TaskHandleItem,
    TaskName,
    TaskType,
    TaskVersion,
    ProjectType,
    AreaCode,
    case when secondDeptCode is null then DeptCode else secondDeptCode end DeptCode,
    DeptName,
    Cd_operation,
    Cd_time,
    TaskState,
    case when secondDeptCode is null then '1' else '0' end isSecondDept,
    '1' isTaskHandleItem,
    case when TaskState in ('2','3') then '0' else '1' end is_use1,
    case when Cd_operation = 'D' then '0' else '1' end is_use2
FROM
    up_task_huiqi_basic 
WHERE
    TaskHandleItem IS NOT NULL 
    ) temp
WHERE temp.Cd_time >= %s
ORDER BY
    temp.Cd_time ASC
"""

# ======================
# 主逻辑
# ======================
def sync_data(config, batch_size=1000):
    MYSQL_CONFIG = config["mysql"]
    ORACLE_CONFIG = config["oracle"]
    sync_date = config["sync"]["start_date"]

    # 连接 MySQL
    try:
        mysql_conn = get_mysql_connection(MYSQL_CONFIG)
        mysql_cursor = mysql_conn.cursor()
        mysql_cursor.execute(MYSQL_SQL, (sync_date,))
        rows = mysql_cursor.fetchall()
        total_rows = len(rows)
        logging.info(f"MySQL 查询成功,共 {total_rows} 条数据")
    except Exception as e:
        logging.error(f"MySQL 连接或查询失败: {e}")
        logging.error(traceback.format_exc())
        return

    # 连接 Oracle
    try:
        oracle_conn = get_oracle_connection(ORACLE_CONFIG)
        oracle_cursor = oracle_conn.cursor()
    except Exception as e:
        logging.error(f"Oracle 连接失败: {e}")
        logging.error(traceback.format_exc())
        return

    inserted = updated = errors = processed = 0
    error_records = []

    while processed < total_rows:
        batch = rows[processed:processed + batch_size]
        if not batch:
            break

        for row in batch:
            processed += 1
            new_uuid = str(uuid.uuid4())
            create_time = datetime.datetime.now()
            item_code = row.get("ItemCode")
            task_name = row.get("TaskName")

            try:
                # 判断是否存在
                oracle_cursor.execute(
                    "SELECT COUNT(*) FROM ACCEPT_ITEM_TRANSIT WHERE ITEM_CODE = :item_code",
                    [item_code]
                )
                exists = oracle_cursor.fetchone()[0]

                if exists:
                    oracle_cursor.execute("""
                        UPDATE ACCEPT_ITEM_TRANSIT
                        SET ITEM_NAME = :task_name,
                            TYPE = :task_type,
                            VERSION = :task_version,
                            REGION_CODE = :area_code,
                            ORGAN_CODE = :dept_code,
                            ORGAN_NAME = :dept_name,
                            UPDATE_TIME = :update_time,
                            ASSORT = :project_type,
                            IS_USE = :is_use,
                            TASK_CODE = :task_code,
                            IS_HANDLE_ITEM = :is_handle_item
                        WHERE ITEM_CODE = :item_code
                    """, {
                        "task_name": row["TaskName"],
                        "task_type": row["TaskType"],
                        "task_version": row["TaskVersion"],
                        "area_code": row["AreaCode"],
                        "dept_code": row["DeptCode"],
                        "dept_name": row["DeptName"],
                        "update_time": row["Cd_time"],
                        "project_type": row["ProjectType"],
                        "is_use": row["is_use"],
                        "task_code": row["TaskCode"],
                        "is_handle_item": row["isTaskHandleItem"],
                        "item_code": item_code
                    })
                    updated += 1
                else:
                    oracle_cursor.execute("""
                        INSERT INTO ACCEPT_ITEM_TRANSIT (
                            ID, ITEM_ID, ITEM_CODE, ITEM_NAME, TYPE,
                            VERSION, REGION_CODE, ORGAN_CODE, ORGAN_NAME,
                            UPDATE_TIME, CREATE_TIME, ASSORT, IS_USE, TASK_CODE, IS_HANDLE_ITEM
                        ) VALUES (
                            :id, :item_id, :item_code, :task_name, :task_type,
                            :task_version, :area_code, :dept_code, :dept_name,
                            :update_time, :create_time, :project_type, :is_use, :task_code, :is_handle_item
                        )
                    """, {
                        "id": new_uuid,
                        "item_id": row["RowGuid"],
                        "item_code": item_code,
                        "task_name": row["TaskName"],
                        "task_type": row["TaskType"],
                        "task_version": row["TaskVersion"],
                        "area_code": row["AreaCode"],
                        "dept_code": row["DeptCode"],
                        "dept_name": row["DeptName"],
                        "update_time": row["Cd_time"],
                        "create_time": create_time,
                        "project_type": row["ProjectType"],
                        "is_use": row["is_use"],
                        "task_code": row["TaskCode"],
                        "is_handle_item": row["isTaskHandleItem"]
                    })
                    inserted += 1

            except Exception as e:
                errors += 1
                logging.error(f"同步失败: ITEM_CODE={item_code}, 错误={e}")
                logging.error(traceback.format_exc())
                error_records.append({"ItemCode": item_code, "TaskName": task_name, "Error": str(e)})

        oracle_conn.commit()
        percent = processed / total_rows * 100
        logging.info(
            f"批次完成: 已处理 {processed}/{total_rows} ({percent:.2f}%) "
            f"新增 {inserted}, 更新 {updated}, 出错 {errors}"
        )

    mysql_cursor.close()
    mysql_conn.close()
    oracle_cursor.close()
    oracle_conn.close()

    if error_records:
        ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"error_records_{ts}.csv"
        with open(filename, "w", newline="", encoding="utf-8-sig") as f:
            writer = csv.DictWriter(f, fieldnames=["ItemCode", "TaskName", "Error"])
            writer.writeheader()
            writer.writerows(error_records)
        logging.info(f"错误记录已导出到 {filename},共 {len(error_records)} 条")

    logging.info("=" * 60)
    logging.info(f"同步完成: 新增 {inserted}, 更新 {updated}, 出错 {errors}, 总计 {processed} 条")
    logging.info("=" * 60)

if __name__ == "__main__":
    config = load_config("config.yaml")
    sync_data(config, batch_size=1000)

配置文件

mysql:
  host: "192.168.1.10"
  port: 3306
  user: "ceshi"
  password: "ceshi@123"
  database: "ceshi"
  charset: "utf8mb4"

oracle:
  user: "ceshi"
  password: "ceshi@123"
  dsn: "192.168.1.11:1521/nndb"   # ORCL 替换成实际服务名

sync:
  start_date: "2025-10-23"   # 同步的起始时间