Python3数据抽取同步多线程demo
import pymysql
import threading
import retry
import traceback
import logging
# 创建 thread-local 对象,用于管理每个线程的数据库连接
thread_local = threading.local()
# 计数器变量
updated_rows = 0
inserted_rows = 0
# 配置正常执行的日志记录
logging.basicConfig(filename='execution.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 配置异常信息的日志记录
error_logger = logging.getLogger('error_logger')
error_logger.setLevel(logging.ERROR)
error_handler = logging.FileHandler('error.log')
error_handler.setLevel(logging.ERROR)
error_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
error_handler.setFormatter(error_formatter)
error_logger.addHandler(error_handler)
@retry.retry(pymysql.Error, tries=3, delay=1)
def get_source_conn():
# 检查当前线程是否已有数据库连接,若有则直接返回,否则创建新的连接
if not hasattr(thread_local, "source_conn"):
thread_local.source_conn = pymysql.connect(
host='数据库地址',
user='数据库用户',
password='数据库密码',
db='数据库'
)
return thread_local.source_conn
@retry.retry(pymysql.Error, tries=3, delay=1)
def get_target_conn():
# 检查当前线程是否已有数据库连接,若有则直接返回,否则创建新的连接
if not hasattr(thread_local, "target_conn"):
thread_local.target_conn = pymysql.connect(
host='数据库地址',
user='数据库用户',
password='数据库密码',
db='数据库'
)
return thread_local.target_conn
def process_row(row):
global updated_rows, inserted_rows
try:
source_conn = get_source_conn()
target_conn = get_target_conn()
# 获取源数据库游标
source_cursor = source_conn.cursor()
# 获取目标数据库游标
target_cursor = target_conn.cursor()
# 检查目标数据库中是否已经存在相同的数据
check_query = "SELECT * FROM test WHERE name = %s and bianma = %s and ip = %s"
target_cursor.execute(check_query, (row[0], row[1], row[2]))
# 如果目标数据库中存在相同的数据,则比较数据是否有变化
if target_cursor.rowcount > 0:
target_row = target_cursor.fetchone()
# 如果数据有变化,则更新目标数据
if row[3] != target_row[3]:
update_query = "UPDATE test SET ml = %s WHERE name = %s and bianma = %s and ip = %s"
target_cursor.execute(update_query, (row[3], row[0], row[1], row[2]))
# 更新计数器
updated_rows += 1
logging.info("Updated record: {}".format(row))
else:
# 否则,将数据插入到目标数据库
insert_query = "INSERT INTO test (name, bianma, ip, ml) VALUES (%s, %s, %s, %s)"
target_cursor.execute(insert_query, (row[0], row[1], row[2], row[3]))
# 更新计数器
inserted_rows += 1
logging.info("Inserted record: {}".format(row))
# 提交目标数据库事务
target_conn.commit()
except Exception as e:
traceback.print_exc()
logging.error("Error processing record: {}. Error: {}".format(row, str(e)))
error_logger.error("Error processing record: {}. Error: {}".format(row, str(e)))
finally:
# 关闭游标和数据库连接
if source_cursor:
source_cursor.close()
if target_cursor:
target_cursor.close()
def main():
try:
source_conn = get_source_conn()
source_cursor = source_conn.cursor()
# 执行查询
source_cursor.execute("SELECT * FROM sxjml")
rows = source_cursor.fetchall()
# 创建线程列表
threads = []
for row in rows:
# 创建并启动线程
thread = threading.Thread(target=process_row, args=(row,))
thread.start()
threads.append(thread)
# 控制最大并发线程数为200
if len(threads) >= 200:
for thread in threads:
thread.join()
threads = []
# 等待所有线程完成
for thread in threads:
thread.join()
# 打印已更新的记录数量和新增的记录数量
print("已更新 {} 条记录".format(updated_rows))
print("已新增 {} 条记录".format(inserted_rows))
logging.info("Total updated records: {}".format(updated_rows))
logging.info("Total inserted records: {}".format(inserted_rows))
except Exception as e:
traceback.print_exc()
logging.error("Error in main function. Error: {}".format(str(e)))
error_logger.error("Error in main function. Error: {}".format(str(e)))
finally:
if source_cursor:
source_cursor.close()
if source_conn:
source_conn.close()
if hasattr(thread_local, "source_conn"):
del thread_local.source_conn
if hasattr(thread_local, "target_conn"):
del thread_local.target_conn
if __name__ == '__main__':
main()
版权所有:刘小猪-博客
文章标题:Python3数据抽取同步多线程demo
文章链接:https://blog.liuxiaozhu.cn/?post=51
本站文章均为原创,未经授权请勿用于任何商业用途
文章标题:Python3数据抽取同步多线程demo
文章链接:https://blog.liuxiaozhu.cn/?post=51
本站文章均为原创,未经授权请勿用于任何商业用途