«

Python3数据同步脚本Demo

发布于 阅读:90 教程


配置文件config.conf

[database]
## 源数据库信息
source_host = 192.168.1.10
source_port = 3306
source_user = root
source_password = 123456
source_db = blog_liuxiaozhu
## 目标数据库信息
target_host = 192.168.1.12
target_port = 3306
target_user = root
target_password = 123456
target_db = xadmin

[table]
## 同步数据表信息
sync_table = emlog_blog

[logging]
## 日志配置信息
log_path = logs
info_log_file = info.log
error_log_file = error.log
backup_count = 7

源目标表需要有的字段

ALTER TABLE your_table_name
ADD COLUMN `sync_status` tinyint NULL DEFAULT 0,
ADD COLUMN `sync_error` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL;

目标表需要有的字段

ALTER TABLE your_table_name
ADD COLUMN `insert_time` timestamp NULL COMMENT '同步时间',
ADD COLUMN `update_time` timestamp NULL COMMENT '更新时间';

python脚本

import pymysql
import logging
from logging.handlers import TimedRotatingFileHandler
from datetime import datetime
import time
import configparser
import os

# 读取配置文件
config = configparser.ConfigParser()
config.read('config.conf', encoding='utf-8')  # 指定编码为utf-8

# 获取数据库配置信息
source_config = {
    'host': config['database']['source_host'],
    'port': int(config['database']['source_port']),
    'user': config['database']['source_user'],
    'password': config['database']['source_password'],
    'db': config['database']['source_db'],
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}
target_config = {
    'host': config['database']['target_host'],
    'port': int(config['database']['target_port']),
    'user': config['database']['target_user'],
    'password': config['database']['target_password'],
    'db': config['database']['target_db'],
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}

# 获取同步表配置
table_name = config['table']['sync_table']

# 获取日志配置信息
log_path = config['logging']['log_path']
info_log_file = os.path.join(log_path, config['logging']['info_log_file'])
error_log_file = os.path.join(log_path, config['logging']['error_log_file'])
backup_count = int(config['logging']['backup_count'])

# 创建日志目录(如果不存在)
os.makedirs(log_path, exist_ok=True)

# 配置 logging 模块
log_formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s')

info_handler = TimedRotatingFileHandler(info_log_file, when="midnight", interval=1, backupCount=backup_count, encoding='utf-8')
info_handler.setFormatter(log_formatter)
info_handler.setLevel(logging.INFO)

error_handler = TimedRotatingFileHandler(error_log_file, when="midnight", interval=1, backupCount=backup_count, encoding='utf-8')
error_handler.setFormatter(log_formatter)
error_handler.setLevel(logging.ERROR)

console_handler = logging.StreamHandler()
console_handler.setFormatter(log_formatter)
console_handler.setLevel(logging.INFO)

logging.basicConfig(level=logging.INFO, handlers=[info_handler, error_handler, console_handler])

def sync_data():
    source_connection = None
    target_connection = None
    source_cursor = None
    target_cursor = None

    updated_count = 0
    inserted_count = 0

    try:
        source_connection = pymysql.connect(**source_config)
        source_cursor = source_connection.cursor()
        target_connection = pymysql.connect(**target_config)
        target_cursor = target_connection.cursor()

        source_cursor.execute(f"SELECT * FROM {table_name} WHERE sync_status != 1")
        rows = source_cursor.fetchall()
        for row in rows:
            try:
                target_cursor.execute(f"SELECT * FROM {table_name} WHERE gid = %s", (row['gid'],))
                existing_row = target_cursor.fetchone()
                if existing_row:
                    source_data = {k: v for k, v in row.items() if k not in ('sync_status', 'sync_error')}
                    target_data = {k: v for k, v in existing_row.items() if k not in ('insert_time', 'update_time')}
                    if source_data == target_data:
                        source_cursor.execute(f"UPDATE {table_name} SET sync_status = 1, sync_error = '' WHERE gid = %s", (row['gid'],))
                        source_connection.commit()
                        logging.info(f"数据内容一致,无需更新记录: {row['gid']}")
                    else:
                        columns = ', '.join([k for k in row.keys() if k not in ('sync_status', 'sync_error')])
                        placeholders = ', '.join(['%s'] * (len(row) - 2))
                        values = tuple([v for k, v in row.items() if k not in ('sync_status', 'sync_error')])
                        sql = f"UPDATE {table_name} SET {', '.join([f'{k}=%s' for k in row.keys() if k not in ('sync_status', 'sync_error')])}, update_time = %s WHERE gid = %s"
                        target_cursor.execute(sql, (*values, datetime.now(), row['gid']))
                        target_connection.commit()
                        source_cursor.execute(f"UPDATE {table_name} SET sync_status = 1, sync_error = '' WHERE gid = %s", (row['gid'],))
                        source_connection.commit()
                        logging.info(f"更新记录成功: {row['gid']}")
                        updated_count += 1
                else:
                    columns = ', '.join([k for k in row.keys() if k not in ('sync_status', 'sync_error')])
                    placeholders = ', '.join(['%s'] * (len(row) - 2))
                    values = tuple([v for k, v in row.items() if k not in ('sync_status', 'sync_error')])
                    sql = f"INSERT INTO {table_name} ({columns}, insert_time) VALUES ({placeholders}, %s)"
                    target_cursor.execute(sql, (*values, datetime.now()))
                    target_connection.commit()
                    source_cursor.execute(f"UPDATE {table_name} SET sync_status = 1, sync_error = '' WHERE gid = %s", (row['gid'],))
                    source_connection.commit()
                    logging.info(f"成功同步记录: {row['gid']}")
                    inserted_count += 1
            except Exception as e:
                source_cursor.execute(f"UPDATE {table_name} SET sync_status = 2, sync_error = %s WHERE gid = %s", (str(e), row['gid'],))
                source_connection.commit()
                logging.error(f"同步记录失败: {row['gid']}, 错误: {str(e)}")
        logging.info(f"当前数据同步完成,更新了 {updated_count} 条记录,插入了 {inserted_count} 条记录,下次同步时间十分钟后")
    except Exception as e:
        logging.critical(f"同步过程中发生错误:{e}")
    finally:
        if source_cursor:
            source_cursor.close()
        if target_cursor:
            target_cursor.close()
        if source_connection:
            source_connection.close()
        if target_connection:
            target_connection.close()

if __name__ == '__main__':
    while True:
        sync_data()
        time.sleep(600)

Python