import json import multiprocessing import os import shutil import warnings from service import trans_service from service.plt_service import get_wind_info, get_all_wind_by_company_code from tool.ClassIdentifier import ClassIdentifier from utils.common_utils import read_file, get_all_files warnings.filterwarnings('ignore') from service.trans_service import * # def generate_warn_falut_code_maps(file_path): # df = pd.read_excel(file_path) # df['主控ID'] = df['主控ID'].str.strip() # df['状态编码'] = df['状态编码'].astype(int) # df['SC中文描述'] = df['SC中文描述'].str.strip() # df['告警等级'] = df['告警等级'].str.strip() # df['告警等级'] = df['告警等级'].fillna("告警信息") # # result_map = dict() # for index, row in df.iterrows(): # controller_id = row['主控ID'].strip() # status_code = int(row['状态编码']) # cn_des = row['SC中文描述'] # level = row['告警等级'] # if controller_id in result_map.keys(): # result_map[controller_id][status_code] = (cn_des, level) # else: # result_map[controller_id] = {status_code: (cn_des, level)} # # return result_map # # # def generate_mc_version_maps(file_path): # df = pd.read_excel(file_path) # # mc_version_maps = dict() # for _, data in df.iterrows(): # changzhan = data['场站标准化编号'].strip() # wind_no = str(data['风机号']) # mc_versioin = data['主控版本'].strip() # if changzhan in mc_version_maps.keys(): # mc_version_maps[changzhan][wind_no] = mc_versioin # else: # mc_version_maps[changzhan] = {wind_no: mc_versioin} # # return mc_version_maps def scada_read_and_save_db(file_path, wind_factory_map: dict): try: df = read_file(file_path) if 'wind_farm_code' in df.columns: del df['wind_farm_code'] wind_no = os.path.basename(file_path).split('_')[0] df['wind_turbine_name'] = df['wind_turbine_name'].astype('str') wind_col_trans, rated_power_and_cutout_speed_map = get_wind_info(wind_no) df['wind_turbine_number'] = df['wind_turbine_name'].map(wind_col_trans).fillna(df['wind_turbine_name']) wind_turbine_number = df['wind_turbine_number'].unique()[0] df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce") df.dropna(subset=['time_stamp'], inplace=True) df.sort_values(by='time_stamp', inplace=True) logger.info(f"有功功率前10个 :{df.head(10)['active_power'].values}") power_df = df[df['active_power'] > 0] logger.info(f"{wind_turbine_number} 功率大于0的数量:{power_df.shape}") power = power_df.sample(int(power_df.shape[0] / 100))['active_power'].median() logger.info(f"{wind_turbine_number} 有功功率,中位数:{power}") rated_power_and_cutout_speed_tuple = rated_power_and_cutout_speed_map.get(str(wind_turbine_number), None) if rated_power_and_cutout_speed_tuple is None: rated_power_and_cutout_speed_tuple = (None, None) sec_df_origin = df.copy() class_identifiler = ClassIdentifier(wind_turbine_number=wind_turbine_number, origin_df=sec_df_origin, rated_power=rated_power_and_cutout_speed_tuple[0], cut_out_speed=rated_power_and_cutout_speed_tuple[1]) sec_df = class_identifiler.run() if not sec_df.empty: sec_df['year'] = sec_df['time_stamp'].dt.year sec_df['month'] = sec_df['time_stamp'].dt.month sec_df['day'] = sec_df['time_stamp'].dt.day sec_df['time_stamp'] = sec_df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S')) sec_df['year_month'] = sec_df[['year', 'month']].apply(lambda x: str(x['year']) + str(x['month']).zfill(2), axis=1) min_df_origin = df[df['time_stamp'].dt.minute % 10 == 0] class_identifiler = ClassIdentifier(wind_turbine_number=wind_turbine_number, origin_df=min_df_origin, rated_power=rated_power_and_cutout_speed_tuple[0], cut_out_speed=rated_power_and_cutout_speed_tuple[1]) min_df = class_identifiler.run() if not min_df.empty: min_df['year'] = min_df['time_stamp'].dt.year min_df['month'] = min_df['time_stamp'].dt.month min_df['day'] = min_df['time_stamp'].dt.day min_df['time_stamp'] = min_df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S')) min_df['year_month'] = min_df[['year', 'month']].apply(lambda x: str(x['year']) + str(x['month']).zfill(2), axis=1) save_df_to_db(f'{wind_no}_second', sec_df) save_df_to_db(f'{wind_no}_minute', min_df) update_day_count(sec_df, wind_no, wind_factory_map[wind_no], 'second') update_day_count(min_df, wind_no, wind_factory_map[wind_no], 'minute') os.remove(file_path) except: logger.error(f"{file_path}转化失败") logger.error(traceback.format_exc(), exc_info=True) error_path = file_path.replace('scada', 'scada_error') os.makedirs(os.path.dirname(error_path), exist_ok=True) shutil.move(file_path, error_path) finally: logger.info(f'scada/{os.path.basename(file_path)}执行结束') def generate_warn_fault_data(df): df['time_stamp'] = pd.to_datetime(df['time_stamp']) # 识别成功状态 success_codes = {0, 309001} df['Is_Success'] = df['fault_code'].isin(success_codes) # 找到故障开始和结束时间 result = [] i = 0 n = len(df) while i < n: current_status = df.iloc[i]['fault_code'] current_time = df.iloc[i]['time_stamp'] # 如果是故障状态(非成功状态) if current_status not in success_codes: # 记录故障开始 fault_start = current_time fault_status = current_status # 跳过连续的相同故障状态 while i < n and df.iloc[i]['fault_code'] == fault_status: i += 1 # 找到下一个成功状态的时间 success_time = None fault_duration = None # 寻找下一个成功状态 for j in range(i, n): if df.iloc[j]['fault_code'] in success_codes: success_time = df.iloc[j]['time_stamp'] fault_duration = int((success_time - fault_start).total_seconds()) break # 如果有成功时间,计算故障时长 if success_time: # 添加到结果 result.append([ df.iloc[0]['wind_turbine_name'], # ID fault_start, # 故障开始时间 fault_status, # 故障状态码 success_time, # 成功时间 fault_duration # 故障时长 ]) else: # 如果没有成功时间(故障持续到最后) result.append([ df.iloc[0]['wind_turbine_name'], fault_start, fault_status, None, None ]) else: # 如果是成功状态,继续下一行 i += 1 # 创建结果DataFrame result_df = pd.DataFrame(result, columns=['wind_turbine_name', 'begin_time', 'fault_code', 'end_time', 'time_diff']) result_df['fault_code'] = result_df['fault_code'].astype(int) return result_df def warn_fault_read_and_save_db(file_path, wind_factory_map: dict, mc_vesion_map: dict, warn_falut_code_map: dict): try: wind_farm_code = os.path.basename(file_path).split('_')[0] wind_turbine_name = os.path.basename(file_path).split('_')[1].replace('.csv', '') df = read_file(file_path) col_map = {'wind_turbine_id': 'wind_turbine_name', 'timestamp': 'time_stamp', 'warn_fault_code': 'fault_code'} df.rename(columns=col_map, inplace=True) df['fault_code'] = df['fault_code'].astype(int, errors='ignore') df = df.dropna(subset=['fault_code']) df['wind_turbine_name'] = df['wind_turbine_name'].astype('str') df['wind_turbine_number'] = df['wind_turbine_name'].map(wind_factory_map).fillna(df['wind_turbine_name']) df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce") df.dropna(subset=['time_stamp'], inplace=True) df.sort_values(by='time_stamp', inplace=True) df.reset_index(drop=True, inplace=True) first_code = df.loc[0, 'fault_code'] data_time = df.loc[0, 'time_stamp'] if first_code in [0, 309001]: trans_service.update_warn_fault_exist_data_with_db(wind_farm_code, wind_turbine_name, data_time) wind_col_trans, rated_power_and_cutout_speed_map = get_wind_info(wind_farm_code) save_df = generate_warn_fault_data(df) save_df['wind_farm_code'] = wind_farm_code save_df['wind_farm_name'] = wind_factory_map[wind_farm_code] save_df['wind_turbine_name'] = save_df['wind_turbine_name'].astype('str') save_df['wind_turbine_number'] = save_df['wind_turbine_name'].map(wind_col_trans) mc_vesion = mc_vesion_map[wind_farm_code][wind_turbine_name] save_df['mc_version'] = mc_vesion def get_fault_dict(x, index): if x['mc_version'] not in warn_falut_code_map.keys(): return None if x['fault_code'] not in warn_falut_code_map[x['mc_version']].keys(): return None return warn_falut_code_map[x['mc_version']][x['fault_code']][index] save_df['fault_detail'] = save_df[['mc_version', 'fault_code']].apply(get_fault_dict, args=(0,), axis=1) save_df['fault_level'] = save_df[['mc_version', 'fault_code']].apply(get_fault_dict, args=(1,), axis=1) select_cols = ['wind_turbine_number', 'wind_turbine_name', 'mc_version', 'begin_time', 'end_time', 'time_diff', 'fault_code', 'fault_detail', 'fault_level'] warn_df = save_df[save_df['fault_level'] != '故障'][select_cols] fault_df = save_df[save_df['fault_level'] == '故障'][select_cols] warn_df.sort_values(by='begin_time', inplace=True) fault_df.sort_values(by='begin_time', inplace=True) if not warn_df.empty: trans_service.save_df_to_db(f'{wind_farm_code}_warn', warn_df) warn_max_date = warn_df['begin_time'].max() add_date_str = warn_max_date.strftime('%Y-%m-%d') warn_last_date_str = warn_max_date.strftime('%Y-%m-%d %H:%M:%S') # wind_farm_code, wind_farm_name, add_date, trans_type, count, latest_data_time trans_service.update_wind_farm_day_count(wind_farm_code, wind_factory_map.get(wind_farm_code, ''), add_date_str, 'warn', warn_df.shape[0], warn_last_date_str, 1) if not fault_df.empty: trans_service.save_df_to_db(f'{wind_farm_code}_fault', fault_df) fault_max_date = fault_df['begin_time'].max() add_date_str = fault_max_date.strftime('%Y-%m-%d') fault_last_date_str = fault_max_date.strftime('%Y-%m-%d %H:%M:%S') trans_service.update_wind_farm_day_count(wind_farm_code, wind_factory_map.get(wind_farm_code, ''), add_date_str, 'fault', fault_df.shape[0], fault_last_date_str, 1) os.remove(file_path) except: logger.error(f"{file_path}转化失败") logger.error(traceback.format_exc(), exc_info=True) error_path = file_path.replace('warn_fault', 'warn_fault_error') os.makedirs(os.path.dirname(error_path), exist_ok=True) shutil.move(file_path, error_path) finally: logger.info(f'warn_fault/{os.path.basename(file_path)}执行结束') def update_day_count(df_data: pd.DataFrame, wind_farm_code: str, wind_name, data_type: str): df = df_data[['wind_turbine_number', 'time_stamp']].copy() df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce") df['time_stamp'] = df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d')) res_df = df.groupby('time_stamp')['wind_turbine_number'].count().reset_index() res_df.rename(columns={'time_stamp': 'add_date', 'wind_turbine_number': 'count'}, inplace=True) res_df['latest_data_time'] = res_df['add_date'] res_df['sync_status'] = 1 res_df['wind_farm_code'] = wind_farm_code res_df['wind_farm_name'] = wind_name res_df['type'] = data_type for index, data in res_df.iterrows(): update_wind_farm_day_count(data['wind_farm_code'], data['wind_farm_name'], data['add_date'], data['type'], data['count'], data['latest_data_time'], sync_status=1) if __name__ == '__main__': scada_dir = r'/home/trans/data/scada' warn_fault_dir = r'/home/trans/data/warn_fault' scada_files = get_all_files(scada_dir) warn_fault_files = get_all_files(warn_fault_dir) wind_factory_map = get_all_wind_by_company_code('COM00002') with open('conf/mc_vesion.json', 'r') as f: mc_vesion_map = json.load(f) with open('conf/warn_fault_mc_code.json', 'r') as f: warn_falut_code_map = json.load(f) with multiprocessing.Pool(4) as pool: pool.starmap(scada_read_and_save_db, [(i, wind_factory_map) for i in scada_files]) with multiprocessing.Pool(4) as pool: pool.starmap(warn_fault_read_and_save_db, [(i, wind_factory_map, mc_vesion_map, warn_falut_code_map) for i in warn_fault_files])