import multiprocessing import os.path import traceback from etl.common.PathsAndTable import PathsAndTable from service.trans_conf_service import update_trans_transfer_progress from service.trans_service import save_scada_file_to_db, save_file_to_db from utils.log.trans_log import info, error from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent class SaveToDb(object): def __init__(self, pathsAndTable: PathsAndTable, update_files, batch_count=100000): self.pathsAndTable = pathsAndTable self.batch_count = batch_count self.update_files = update_files def mutiprocessing_to_save_db(self): # 开始保存到SQL文件 all_saved_files = self.update_files # 映射到的文件保存到数据库 all_saved_files = [i for i in all_saved_files if os.path.basename(i).split(".")[0] in self.pathsAndTable.wind_col_trans.keys()] if not all_saved_files: info("没有文件需要保存到数据库") return self.pathsAndTable.create_wind_farm_db() # 计算最佳进程数 max_processes = get_available_cpu_count_with_percent(percent=2 / 3) max_processes = min(max_processes, len(all_saved_files), 10) # 限制最大进程数为10 try: # 创建一个进程池处理所有文件 with multiprocessing.Pool(max_processes) as pool: if self.pathsAndTable.read_type in ['minute', 'second']: # 准备参数 params = [(self.pathsAndTable.get_table_name(), file, self.pathsAndTable.wind_col_trans[os.path.basename(file).split(".")[0]], os.path.basename(os.path.dirname(file)), self.batch_count, self.pathsAndTable.use_tidb) for file in all_saved_files] # 分批次处理并更新进度 batch_size = max(1, len(params) // 10) # 最多10个批次 for i in range(0, len(params), batch_size): batch_params = params[i:i + batch_size] pool.starmap(save_scada_file_to_db, batch_params) # 更新进度 progress = 70 + 29 * (i + len(batch_params)) / len(params) update_trans_transfer_progress(self.pathsAndTable.id, round(progress, 2), self.pathsAndTable.save_db) else: # 准备参数 params = [(self.pathsAndTable.get_table_name(), file, self.batch_count) for file in all_saved_files] # 分批次处理并更新进度 batch_size = max(1, len(params) // 10) # 最多10个批次 for i in range(0, len(params), batch_size): batch_params = params[i:i + batch_size] pool.starmap(save_file_to_db, batch_params) # 更新进度 progress = 70 + 29 * (i + len(batch_params)) / len(params) update_trans_transfer_progress(self.pathsAndTable.id, round(progress, 2), self.pathsAndTable.save_db) except Exception as e: error(traceback.format_exc()) message = "保存到数据库错误,系统返回错误:" + str(e) raise ValueError(message) def run(self): if self.pathsAndTable.save_db: self.mutiprocessing_to_save_db() update_trans_transfer_progress(self.pathsAndTable.id, 99, self.pathsAndTable.save_db)