""" 读取104规约返回的数据,并标准化到临时表中 """ import multiprocessing import os.path import time import warnings from datetime import datetime, timedelta import pandas as pd from service import plt_service, trans_service from utils.conf.read_conf import yaml_conf, read_conf from utils.log.trans_log import logger warnings.filterwarnings('ignore') def is_file_modified_recently(file_path, minutes=15): """ 检查文件最后修改时间是否在指定的分钟数内 参数: file_path: 文件路径 minutes: 检查的分钟数(默认为15分钟) 返回: bool: True表示在指定时间内修改过,False表示超过指定时间未修改 """ if not os.path.exists(file_path): raise FileNotFoundError(f"文件不存在: {file_path}") # 获取文件最后修改时间(时间戳) mod_time = os.path.getmtime(file_path) # 转换为datetime对象 mod_datetime = datetime.fromtimestamp(mod_time) logger.info(f'文件修改时间:{mod_datetime}') # 计算当前时间与修改时间的差 time_diff = datetime.now() - mod_datetime # 检查是否在指定分钟内 return time_diff <= timedelta(minutes=minutes) def generate_mesurepoint_maps(file_path): df = pd.read_excel(file_path) wind_maps = dict() for _, data in df.iterrows(): shunxuhao = int(data['顺序号']) + 1 changzhan = data['场站标准化编号'] wind_no = data['风机号'] en_name = data['标准化英文'] if changzhan in wind_maps.keys(): if wind_no in wind_maps[changzhan].keys(): wind_maps[changzhan][wind_no][shunxuhao] = en_name else: wind_maps[changzhan][wind_no] = {shunxuhao: en_name} else: wind_maps[changzhan] = {wind_no: {shunxuhao: en_name}} return wind_maps def generate_warn_falut_code_maps(file_path): df = pd.read_excel(file_path) df['主控ID'] = df['主控ID'].str.strip() df['状态编码'] = df['状态编码'].astype(int) df['SC中文描述'] = df['SC中文描述'].str.strip() df['告警等级'] = df['告警等级'].str.strip() df['告警等级'] = df['告警等级'].fillna("告警信息") result_map = dict() for index, row in df.iterrows(): controller_id = row['主控ID'].strip() status_code = int(row['状态编码']) cn_des = row['SC中文描述'] level = row['告警等级'] if controller_id in result_map.keys(): result_map[controller_id][status_code] = (cn_des, level) else: result_map[controller_id] = {status_code: (cn_des, level)} return result_map def generate_warn_falut_maps(file_path): df = pd.read_excel(file_path) warn_fault_maps = dict() for _, data in df.iterrows(): seq_no = int(data['顺序号']) + 1 changzhan = data['场站标准化编号'] wind_no = data['风机号'] if changzhan in warn_fault_maps.keys(): if wind_no in warn_fault_maps[changzhan].keys(): warn_fault_maps[changzhan][wind_no].append(seq_no) else: warn_fault_maps[changzhan][wind_no] = [seq_no] else: warn_fault_maps[changzhan] = {wind_no: [seq_no]} return warn_fault_maps def generate_mc_version_maps(file_path): df = pd.read_excel(file_path) mc_version_maps = dict() for _, data in df.iterrows(): changzhan = data['场站标准化编号'].strip() wind_no = data['风机号'] mc_versioin = data['主控版本'].strip() if changzhan in mc_version_maps.keys(): mc_version_maps[changzhan][wind_no] = mc_versioin else: mc_version_maps[changzhan] = {wind_no: mc_versioin} return mc_version_maps def exists_windno_seq_fault(changzhan): exists_data_dict = trans_service.exists_windno_seq_fault(changzhan) return exists_data_dict def generate_warn_fault_data(df, warn_fault_point_maps, warn_falut_code_maps, mc_vesion_maps, changzhan, wind_code_name_map): wind_df = pd.DataFrame() if changzhan not in mc_vesion_maps.keys(): print(f'{changzhan} not in mc_vesion_maps') return None max_time = df[0].max() wind_maps, _ = plt_service.get_wind_info(changzhan, False) wind_nos = warn_fault_point_maps[changzhan].keys() wind_seq_no_map = warn_fault_point_maps[changzhan] exists_data_dict = exists_windno_seq_fault(changzhan) for wind_no in wind_nos: seq_nos = list(wind_seq_no_map[wind_no]) seq_nos.insert(0, 0) now_df = df[seq_nos] now_df.set_index(keys=0, inplace=True) now_df = now_df.T cols = list(now_df.columns) # 设置KEY 为 顺序号_错误码 seq_no_dict = dict() # 循环每行的数据 for index, row in now_df.iterrows(): values = [int(i) for i in row.values] seq_no = index is_first = True for i in range(len(values)): key = f'{seq_no}_{values[i]}' if values[i] == 0 or values[i] == 309001: # 如果当前数据存在错误,则更新字典,否则更新表数据 if key in seq_no_dict.keys(): for data in seq_no_dict[key]: if data[4] is None: data[4] = cols[i] else: # TODO 写数据库 if is_first: trans_service.update_warn_fault_exist_data(changzhan, seq_no, cols[i]) is_first = False elif values[i] > 0: if key not in seq_no_dict.keys(): seq_no_dict[key] = [wind_no, seq_no, values[i], cols[i], None] # result = [sublist for key in seq_no_dict for sublist in seq_no_dict[key]] result = seq_no_dict.values() result_df = pd.DataFrame(result, columns=['wind_turbine_name', 'seq_no', 'fault_code', 'begin_time', 'end_time']) result_df['mc_version'] = result_df['wind_turbine_name'].apply(lambda x: mc_vesion_maps[changzhan].get(x, None)) result_df.dropna(subset=['mc_version'], inplace=True) def get_fault_dict(x, index): if x['mc_version'] not in warn_falut_code_maps.keys(): return None if x['fault_code'] not in warn_falut_code_maps[x['mc_version']].keys(): return None return warn_falut_code_maps[x['mc_version']][x['fault_code']][index] result_df['fault_detail'] = result_df[['mc_version', 'fault_code']].apply(get_fault_dict, args=(0,), axis=1) result_df['fault_level'] = result_df[['mc_version', 'fault_code']].apply(get_fault_dict, args=(1,), axis=1) result_df.dropna(subset=['fault_detail', 'fault_level'], how='all', inplace=True) result_df['begin_time'] = pd.to_datetime(result_df['begin_time'], errors='coerce') result_df['end_time'] = pd.to_datetime(result_df['end_time'], errors='coerce') result_df['time_diff'] = (result_df['end_time'] - result_df['begin_time']).dt.total_seconds() result_df['wind_turbine_name'] = result_df['wind_turbine_name'].astype(str) result_df['wind_turbine_number'] = result_df['wind_turbine_name'].map(wind_maps) wind_df = pd.concat([wind_df, result_df]) # wind_df.to_csv(f'tmp/104/warn_fault/{changzhan}.csv', index=False) warn_df = wind_df[wind_df['fault_level'] != '故障'] fault_df = wind_df[wind_df['fault_level'] == '故障'] # exists_data_dict def del_exist_data(df: pd.DataFrame(), exists_data: list): df['key_no'] = df.apply(lambda x: f"{x['wind_turbine_name']}_{x['seq_no']}_{x['fault_code']}", axis=1) df = df[~((df['time_diff'].isna()) & (df['key_no'].isin(exists_data)))] del df['key_no'] return df warn_df = del_exist_data(warn_df, exists_data_dict['warn']) fault_df = del_exist_data(fault_df, exists_data_dict['fault']) warn_df.sort_values(by='begin_time', inplace=True) fault_df.sort_values(by='begin_time', inplace=True) if not warn_df.empty: trans_service.save_df_to_db(f'{changzhan}_warn', warn_df) warn_max_date = warn_df['begin_time'].max() add_date_str = warn_max_date.strftime('%Y-%m-%d') warn_last_date_str = warn_max_date.strftime('%Y-%m-%d %H:%M:%S') # wind_farm_code, wind_farm_name, add_date, trans_type, count, latest_data_time trans_service.update_wind_farm_day_count(changzhan, wind_code_name_map.get(changzhan, ''), add_date_str, 'warn', warn_df.shape[0], warn_last_date_str, 1) if not fault_df.empty: trans_service.save_df_to_db(f'{changzhan}_fault', fault_df) fault_max_date = fault_df['begin_time'].max() add_date_str = fault_max_date.strftime('%Y-%m-%d') fault_last_date_str = fault_max_date.strftime('%Y-%m-%d %H:%M:%S') trans_service.update_wind_farm_day_count(changzhan, wind_code_name_map.get(changzhan, ''), add_date_str, 'fault', fault_df.shape[0], fault_last_date_str, 1) trans_service.update_warn_fault_update_time(date_str,max_time) if __name__ == '__main__': time.sleep(40) total_begin = time.time() begin = time.time() date_str = datetime.now().strftime('%Y_%m_%d') wind_code_name_map = plt_service.get_all_wind_by_company_code('COM00002') conf_path = os.path.abspath(f"./conf/config.yaml") yaml_config = yaml_conf(conf_path) data_base_dir = read_conf(yaml_config, 'data_base_dir') read_dir = os.path.join(data_base_dir, '2406') # 故障报警测点 warn_fault_point_maps = generate_warn_falut_maps('conf/故障报警测点-2406.xlsx') # 主控代码 warn_falut_code_maps = generate_warn_falut_code_maps('conf/主控故障代码表-2406.xlsx') # 主控版本 mc_vesion_maps = generate_mc_version_maps('conf/主控版本-2406.xlsx') for root, dirs, files in os.walk(read_dir): files.sort() for file in files: exec_begin = time.time() file_dir = os.path.basename(root) read_csv = root + os.sep + file df = pd.read_csv(read_csv, header=None) begin = time.time() with multiprocessing.Pool(5) as pool: pool.starmap(generate_warn_fault_data, [(df, warn_fault_point_maps, warn_falut_code_maps, mc_vesion_maps, changzhan, wind_code_name_map) for changzhan in warn_fault_point_maps.keys()]) # TODO 保存最近一条数据到临时文件中以供下次使用 # last_data.to_csv(file_path, header=False, index=False) # new_file = read_csv.replace('data/2406', 'data_bak/2406 ') # shutil.copy(read_csv, new_file) os.remove(read_csv) logger.info( f'故障报警执行完:{file_dir}/{file}耗时:{time.time() - exec_begin},总耗时:{time.time() - total_begin}') logger.info(f'故障报警总耗时:{time.time() - total_begin}')