«

Python3数据抽取同步多线程demo

发布于 阅读:73 教程


import pymysql
import threading
import retry
import traceback
import logging
# 创建 thread-local 对象,用于管理每个线程的数据库连接
thread_local = threading.local()
# 计数器变量
updated_rows = 0
inserted_rows = 0
# 配置正常执行的日志记录
logging.basicConfig(filename='execution.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 配置异常信息的日志记录
error_logger = logging.getLogger('error_logger')
error_logger.setLevel(logging.ERROR)
error_handler = logging.FileHandler('error.log')
error_handler.setLevel(logging.ERROR)
error_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
error_handler.setFormatter(error_formatter)
error_logger.addHandler(error_handler)
@retry.retry(pymysql.Error, tries=3, delay=1)
def get_source_conn():
    # 检查当前线程是否已有数据库连接,若有则直接返回,否则创建新的连接
    if not hasattr(thread_local, "source_conn"):
        thread_local.source_conn = pymysql.connect(
            host='数据库地址',
            user='数据库用户',
            password='数据库密码',
            db='数据库'
        )
    return thread_local.source_conn
@retry.retry(pymysql.Error, tries=3, delay=1)
def get_target_conn():
    # 检查当前线程是否已有数据库连接,若有则直接返回,否则创建新的连接
    if not hasattr(thread_local, "target_conn"):
        thread_local.target_conn = pymysql.connect(
            host='数据库地址',
            user='数据库用户',
            password='数据库密码',
            db='数据库'
        )
    return thread_local.target_conn
def process_row(row):
    global updated_rows, inserted_rows
    try:
        source_conn = get_source_conn()
        target_conn = get_target_conn()
        # 获取源数据库游标
        source_cursor = source_conn.cursor()
        # 获取目标数据库游标
        target_cursor = target_conn.cursor()
        # 检查目标数据库中是否已经存在相同的数据
        check_query = "SELECT * FROM test WHERE name = %s and bianma = %s and ip = %s"
        target_cursor.execute(check_query, (row[0], row[1], row[2]))
        # 如果目标数据库中存在相同的数据,则比较数据是否有变化
        if target_cursor.rowcount > 0:
            target_row = target_cursor.fetchone()
            # 如果数据有变化,则更新目标数据
            if row[3] != target_row[3]:
                update_query = "UPDATE test SET ml = %s WHERE name = %s and bianma = %s and ip = %s"
                target_cursor.execute(update_query, (row[3], row[0], row[1], row[2]))
                # 更新计数器
                updated_rows += 1
                logging.info("Updated record: {}".format(row))
        else:
            # 否则,将数据插入到目标数据库
            insert_query = "INSERT INTO test (name, bianma, ip, ml) VALUES (%s, %s, %s, %s)"
            target_cursor.execute(insert_query, (row[0], row[1], row[2], row[3]))
            # 更新计数器
            inserted_rows += 1
            logging.info("Inserted record: {}".format(row))
        # 提交目标数据库事务
        target_conn.commit()
    except Exception as e:
        traceback.print_exc()
        logging.error("Error processing record: {}. Error: {}".format(row, str(e)))
        error_logger.error("Error processing record: {}. Error: {}".format(row, str(e)))
    finally:
        # 关闭游标和数据库连接
        if source_cursor:
            source_cursor.close()
        if target_cursor:
            target_cursor.close()
def main():
    try:
        source_conn = get_source_conn()
        source_cursor = source_conn.cursor()
        # 执行查询
        source_cursor.execute("SELECT * FROM sxjml")
        rows = source_cursor.fetchall()
        # 创建线程列表
        threads = []
        for row in rows:
            # 创建并启动线程
            thread = threading.Thread(target=process_row, args=(row,))
            thread.start()
            threads.append(thread)
            # 控制最大并发线程数为200
            if len(threads) >= 200:
                for thread in threads:
                    thread.join()
                threads = []
        # 等待所有线程完成
        for thread in threads:
            thread.join()
        # 打印已更新的记录数量和新增的记录数量
        print("已更新 {} 条记录".format(updated_rows))
        print("已新增 {} 条记录".format(inserted_rows))
        logging.info("Total updated records: {}".format(updated_rows))
        logging.info("Total inserted records: {}".format(inserted_rows))
    except Exception as e:
        traceback.print_exc()
        logging.error("Error in main function. Error: {}".format(str(e)))
        error_logger.error("Error in main function. Error: {}".format(str(e)))
    finally:
        if source_cursor:
            source_cursor.close()
        if source_conn:
            source_conn.close()
        if hasattr(thread_local, "source_conn"):
            del thread_local.source_conn
        if hasattr(thread_local, "target_conn"):
            del thread_local.target_conn
if __name__ == '__main__':
    main()

MySQL Python