||
- import json
- import multiprocessing
- import os
- import shutil
- import warnings
- from service import trans_service
- from service.plt_service import get_wind_info, get_all_wind_by_company_code
- from tool.ClassIdentifier import ClassIdentifier
- from utils.common_utils import read_file, get_all_files
- warnings.filterwarnings('ignore')
- from service.trans_service import *
- # def generate_warn_falut_code_maps(file_path):
- # df = pd.read_excel(file_path)
- # df['主控ID'] = df['主控ID'].str.strip()
- # df['状态编码'] = df['状态编码'].astype(int)
- # df['SC中文描述'] = df['SC中文描述'].str.strip()
- # df['告警等级'] = df['告警等级'].str.strip()
- # df['告警等级'] = df['告警等级'].fillna("告警信息")
- #
- # result_map = dict()
- # for index, row in df.iterrows():
- # controller_id = row['主控ID'].strip()
- # status_code = int(row['状态编码'])
- # cn_des = row['SC中文描述']
- # level = row['告警等级']
- # if controller_id in result_map.keys():
- # result_map[controller_id][status_code] = (cn_des, level)
- # else:
- # result_map[controller_id] = {status_code: (cn_des, level)}
- #
- # return result_map
- #
- #
- # def generate_mc_version_maps(file_path):
- # df = pd.read_excel(file_path)
- #
- # mc_version_maps = dict()
- # for _, data in df.iterrows():
- # changzhan = data['场站标准化编号'].strip()
- # wind_no = str(data['风机号'])
- # mc_versioin = data['主控版本'].strip()
- # if changzhan in mc_version_maps.keys():
- # mc_version_maps[changzhan][wind_no] = mc_versioin
- # else:
- # mc_version_maps[changzhan] = {wind_no: mc_versioin}
- #
- # return mc_version_maps
- def scada_read_and_save_db(file_path, wind_factory_map: dict):
- try:
- df = read_file(file_path)
- if 'wind_farm_code' in df.columns:
- del df['wind_farm_code']
- wind_no = os.path.basename(file_path).split('_')[0]
- df['wind_turbine_name'] = df['wind_turbine_name'].astype('str')
- wind_col_trans, rated_power_and_cutout_speed_map = get_wind_info(wind_no)
- df['wind_turbine_number'] = df['wind_turbine_name'].map(wind_col_trans).fillna(df['wind_turbine_name'])
- wind_turbine_number = df['wind_turbine_number'].unique()[0]
- df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
- df.dropna(subset=['time_stamp'], inplace=True)
- df.sort_values(by='time_stamp', inplace=True)
- logger.info(f"有功功率前10个 :{df.head(10)['active_power'].values}")
- power_df = df[df['active_power'] > 0]
- logger.info(f"{wind_turbine_number} 功率大于0的数量:{power_df.shape}")
- power = power_df.sample(int(power_df.shape[0] / 100))['active_power'].median()
- logger.info(f"{wind_turbine_number} 有功功率,中位数:{power}")
- rated_power_and_cutout_speed_tuple = rated_power_and_cutout_speed_map.get(str(wind_turbine_number), None)
- if rated_power_and_cutout_speed_tuple is None:
- rated_power_and_cutout_speed_tuple = (None, None)
- sec_df_origin = df.copy()
- class_identifiler = ClassIdentifier(wind_turbine_number=wind_turbine_number, origin_df=sec_df_origin,
- rated_power=rated_power_and_cutout_speed_tuple[0],
- cut_out_speed=rated_power_and_cutout_speed_tuple[1])
- sec_df = class_identifiler.run()
- if not sec_df.empty:
- sec_df['year'] = sec_df['time_stamp'].dt.year
- sec_df['month'] = sec_df['time_stamp'].dt.month
- sec_df['day'] = sec_df['time_stamp'].dt.day
- sec_df['time_stamp'] = sec_df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
- sec_df['year_month'] = sec_df[['year', 'month']].apply(lambda x: str(x['year']) + str(x['month']).zfill(2),
- axis=1)
- min_df_origin = df[df['time_stamp'].dt.minute % 10 == 0]
- class_identifiler = ClassIdentifier(wind_turbine_number=wind_turbine_number, origin_df=min_df_origin,
- rated_power=rated_power_and_cutout_speed_tuple[0],
- cut_out_speed=rated_power_and_cutout_speed_tuple[1])
- min_df = class_identifiler.run()
- if not min_df.empty:
- min_df['year'] = min_df['time_stamp'].dt.year
- min_df['month'] = min_df['time_stamp'].dt.month
- min_df['day'] = min_df['time_stamp'].dt.day
- min_df['time_stamp'] = min_df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
- min_df['year_month'] = min_df[['year', 'month']].apply(lambda x: str(x['year']) + str(x['month']).zfill(2),
- axis=1)
- save_df_to_db(f'{wind_no}_second', sec_df)
- save_df_to_db(f'{wind_no}_minute', min_df)
- update_day_count(sec_df, wind_no, wind_factory_map[wind_no], 'second')
- update_day_count(min_df, wind_no, wind_factory_map[wind_no], 'minute')
- os.remove(file_path)
- except:
- logger.error(f"{file_path}转化失败")
- logger.error(traceback.format_exc(), exc_info=True)
- error_path = file_path.replace('scada', 'scada_error')
- os.makedirs(os.path.dirname(error_path), exist_ok=True)
- shutil.move(file_path, error_path)
- finally:
- logger.info(f'scada/{os.path.basename(file_path)}执行结束')
- def generate_warn_fault_data(df):
- df['time_stamp'] = pd.to_datetime(df['time_stamp'])
- # 识别成功状态
- success_codes = {0, 309001}
- df['Is_Success'] = df['fault_code'].isin(success_codes)
- # 找到故障开始和结束时间
- result = []
- i = 0
- n = len(df)
- while i < n:
- current_status = df.iloc[i]['fault_code']
- current_time = df.iloc[i]['time_stamp']
- # 如果是故障状态(非成功状态)
- if current_status not in success_codes:
- # 记录故障开始
- fault_start = current_time
- fault_status = current_status
- # 跳过连续的相同故障状态
- while i < n and df.iloc[i]['fault_code'] == fault_status:
- i += 1
- # 找到下一个成功状态的时间
- success_time = None
- fault_duration = None
- # 寻找下一个成功状态
- for j in range(i, n):
- if df.iloc[j]['fault_code'] in success_codes:
- success_time = df.iloc[j]['time_stamp']
- fault_duration = int((success_time - fault_start).total_seconds())
- break
- # 如果有成功时间,计算故障时长
- if success_time:
- # 添加到结果
- result.append([
- df.iloc[0]['wind_turbine_name'], # ID
- fault_start, # 故障开始时间
- fault_status, # 故障状态码
- success_time, # 成功时间
- fault_duration # 故障时长
- ])
- else:
- # 如果没有成功时间(故障持续到最后)
- result.append([
- df.iloc[0]['wind_turbine_name'],
- fault_start,
- fault_status,
- None,
- None
- ])
- else:
- # 如果是成功状态,继续下一行
- i += 1
- # 创建结果DataFrame
- result_df = pd.DataFrame(result, columns=['wind_turbine_name', 'begin_time', 'fault_code', 'end_time', 'time_diff'])
- result_df['fault_code'] = result_df['fault_code'].astype(int)
- return result_df
- def warn_fault_read_and_save_db(file_path, wind_factory_map: dict, mc_vesion_map: dict, warn_falut_code_map: dict):
- try:
- wind_farm_code = os.path.basename(file_path).split('_')[0]
- wind_turbine_name = os.path.basename(file_path).split('_')[1].replace('.csv', '')
- df = read_file(file_path)
- col_map = {'wind_turbine_id': 'wind_turbine_name', 'timestamp': 'time_stamp', 'warn_fault_code': 'fault_code'}
- df.rename(columns=col_map, inplace=True)
- df['fault_code'] = df['fault_code'].astype(int, errors='ignore')
- df = df.dropna(subset=['fault_code'])
- df['wind_turbine_name'] = df['wind_turbine_name'].astype('str')
- df['wind_turbine_number'] = df['wind_turbine_name'].map(wind_factory_map).fillna(df['wind_turbine_name'])
- df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
- df.dropna(subset=['time_stamp'], inplace=True)
- df.sort_values(by='time_stamp', inplace=True)
- df.reset_index(drop=True, inplace=True)
- first_code = df.loc[0, 'fault_code']
- data_time = df.loc[0, 'time_stamp']
- if first_code in [0, 309001]:
- trans_service.update_warn_fault_exist_data_with_db(wind_farm_code, wind_turbine_name, data_time)
- wind_col_trans, rated_power_and_cutout_speed_map = get_wind_info(wind_farm_code)
- save_df = generate_warn_fault_data(df)
- save_df['wind_farm_code'] = wind_farm_code
- save_df['wind_farm_name'] = wind_factory_map[wind_farm_code]
- save_df['wind_turbine_name'] = save_df['wind_turbine_name'].astype('str')
- save_df['wind_turbine_number'] = save_df['wind_turbine_name'].map(wind_col_trans)
- mc_vesion = mc_vesion_map[wind_farm_code][wind_turbine_name]
- save_df['mc_version'] = mc_vesion
- def get_fault_dict(x, index):
- if x['mc_version'] not in warn_falut_code_map.keys():
- return None
- if x['fault_code'] not in warn_falut_code_map[x['mc_version']].keys():
- return None
- return warn_falut_code_map[x['mc_version']][x['fault_code']][index]
- save_df['fault_detail'] = save_df[['mc_version', 'fault_code']].apply(get_fault_dict, args=(0,), axis=1)
- save_df['fault_level'] = save_df[['mc_version', 'fault_code']].apply(get_fault_dict, args=(1,), axis=1)
- select_cols = ['wind_turbine_number', 'wind_turbine_name', 'mc_version', 'begin_time', 'end_time', 'time_diff',
- 'fault_code', 'fault_detail', 'fault_level']
- warn_df = save_df[save_df['fault_level'] != '故障'][select_cols]
- fault_df = save_df[save_df['fault_level'] == '故障'][select_cols]
- warn_df.sort_values(by='begin_time', inplace=True)
- fault_df.sort_values(by='begin_time', inplace=True)
- if not warn_df.empty:
- trans_service.save_df_to_db(f'{wind_farm_code}_warn', warn_df)
- warn_max_date = warn_df['begin_time'].max()
- add_date_str = warn_max_date.strftime('%Y-%m-%d')
- warn_last_date_str = warn_max_date.strftime('%Y-%m-%d %H:%M:%S')
- # wind_farm_code, wind_farm_name, add_date, trans_type, count, latest_data_time
- trans_service.update_wind_farm_day_count(wind_farm_code, wind_factory_map.get(wind_farm_code, ''),
- add_date_str,
- 'warn', warn_df.shape[0], warn_last_date_str, 1)
- if not fault_df.empty:
- trans_service.save_df_to_db(f'{wind_farm_code}_fault', fault_df)
- fault_max_date = fault_df['begin_time'].max()
- add_date_str = fault_max_date.strftime('%Y-%m-%d')
- fault_last_date_str = fault_max_date.strftime('%Y-%m-%d %H:%M:%S')
- trans_service.update_wind_farm_day_count(wind_farm_code, wind_factory_map.get(wind_farm_code, ''),
- add_date_str,
- 'fault', fault_df.shape[0], fault_last_date_str, 1)
- os.remove(file_path)
- except:
- logger.error(f"{file_path}转化失败")
- logger.error(traceback.format_exc(), exc_info=True)
- error_path = file_path.replace('warn_fault', 'warn_fault_error')
- os.makedirs(os.path.dirname(error_path), exist_ok=True)
- shutil.move(file_path, error_path)
- finally:
- logger.info(f'warn_fault/{os.path.basename(file_path)}执行结束')
- def update_day_count(df_data: pd.DataFrame, wind_farm_code: str, wind_name, data_type: str):
- df = df_data[['wind_turbine_number', 'time_stamp']].copy()
- df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
- df['time_stamp'] = df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d'))
- res_df = df.groupby('time_stamp')['wind_turbine_number'].count().reset_index()
- res_df.rename(columns={'time_stamp': 'add_date', 'wind_turbine_number': 'count'}, inplace=True)
- res_df['latest_data_time'] = res_df['add_date']
- res_df['sync_status'] = 1
- res_df['wind_farm_code'] = wind_farm_code
- res_df['wind_farm_name'] = wind_name
- res_df['type'] = data_type
- for index, data in res_df.iterrows():
- update_wind_farm_day_count(data['wind_farm_code'], data['wind_farm_name'], data['add_date'], data['type'],
- data['count'], data['latest_data_time'], sync_status=1)
- if __name__ == '__main__':
- scada_dir = r'/home/trans/data/scada'
- warn_fault_dir = r'/home/trans/data/warn_fault'
- scada_files = get_all_files(scada_dir)
- warn_fault_files = get_all_files(warn_fault_dir)
- wind_factory_map = get_all_wind_by_company_code('COM00002')
- with open('conf/mc_vesion.json', 'r') as f:
- mc_vesion_map = json.load(f)
- with open('conf/warn_fault_mc_code.json', 'r') as f:
- warn_falut_code_map = json.load(f)
- with multiprocessing.Pool(4) as pool:
- pool.starmap(scada_read_and_save_db, [(i, wind_factory_map) for i in scada_files])
- with multiprocessing.Pool(4) as pool:
- pool.starmap(warn_fault_read_and_save_db,
- [(i, wind_factory_map, mc_vesion_map, warn_falut_code_map) for i in warn_fault_files])
|