123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- """
- 读取104规约返回的数据,并标准化到临时表中
- """
- import multiprocessing
- import os.path
- import time
- import warnings
- from datetime import datetime, timedelta
- import pandas as pd
- from service import plt_service, trans_service
- from utils.conf.read_conf import yaml_conf, read_conf
- from utils.log.trans_log import logger
- warnings.filterwarnings('ignore')
- def is_file_modified_recently(file_path, minutes=15):
- """
- 检查文件最后修改时间是否在指定的分钟数内
- 参数:
- file_path: 文件路径
- minutes: 检查的分钟数(默认为15分钟)
- 返回:
- bool: True表示在指定时间内修改过,False表示超过指定时间未修改
- """
- if not os.path.exists(file_path):
- raise FileNotFoundError(f"文件不存在: {file_path}")
- # 获取文件最后修改时间(时间戳)
- mod_time = os.path.getmtime(file_path)
- # 转换为datetime对象
- mod_datetime = datetime.fromtimestamp(mod_time)
- logger.info(f'文件修改时间:{mod_datetime}')
- # 计算当前时间与修改时间的差
- time_diff = datetime.now() - mod_datetime
- # 检查是否在指定分钟内
- return time_diff <= timedelta(minutes=minutes)
- def generate_mesurepoint_maps(file_path):
- df = pd.read_excel(file_path)
- wind_maps = dict()
- for _, data in df.iterrows():
- shunxuhao = int(data['顺序号']) + 1
- changzhan = data['场站标准化编号']
- wind_no = data['风机号']
- en_name = data['标准化英文']
- if changzhan in wind_maps.keys():
- if wind_no in wind_maps[changzhan].keys():
- wind_maps[changzhan][wind_no][shunxuhao] = en_name
- else:
- wind_maps[changzhan][wind_no] = {shunxuhao: en_name}
- else:
- wind_maps[changzhan] = {wind_no: {shunxuhao: en_name}}
- return wind_maps
- def generate_warn_falut_code_maps(file_path):
- df = pd.read_excel(file_path)
- df['主控ID'] = df['主控ID'].str.strip()
- df['状态编码'] = df['状态编码'].astype(int)
- df['SC中文描述'] = df['SC中文描述'].str.strip()
- df['告警等级'] = df['告警等级'].str.strip()
- df['告警等级'] = df['告警等级'].fillna("告警信息")
- result_map = dict()
- for index, row in df.iterrows():
- controller_id = row['主控ID'].strip()
- status_code = int(row['状态编码'])
- cn_des = row['SC中文描述']
- level = row['告警等级']
- if controller_id in result_map.keys():
- result_map[controller_id][status_code] = (cn_des, level)
- else:
- result_map[controller_id] = {status_code: (cn_des, level)}
- return result_map
- def generate_warn_falut_maps(file_path):
- df = pd.read_excel(file_path)
- warn_fault_maps = dict()
- for _, data in df.iterrows():
- seq_no = int(data['顺序号']) + 1
- changzhan = data['场站标准化编号']
- wind_no = data['风机号']
- if changzhan in warn_fault_maps.keys():
- if wind_no in warn_fault_maps[changzhan].keys():
- warn_fault_maps[changzhan][wind_no].append(seq_no)
- else:
- warn_fault_maps[changzhan][wind_no] = [seq_no]
- else:
- warn_fault_maps[changzhan] = {wind_no: [seq_no]}
- return warn_fault_maps
- def generate_mc_version_maps(file_path):
- df = pd.read_excel(file_path)
- mc_version_maps = dict()
- for _, data in df.iterrows():
- changzhan = data['场站标准化编号'].strip()
- wind_no = data['风机号']
- mc_versioin = data['主控版本'].strip()
- if changzhan in mc_version_maps.keys():
- mc_version_maps[changzhan][wind_no] = mc_versioin
- else:
- mc_version_maps[changzhan] = {wind_no: mc_versioin}
- return mc_version_maps
- def exists_windno_seq_fault(changzhan):
- exists_data_dict = trans_service.exists_windno_seq_fault(changzhan)
- return exists_data_dict
- def generate_warn_fault_data(df, warn_fault_point_maps, warn_falut_code_maps, mc_vesion_maps, changzhan,
- wind_code_name_map):
- wind_df = pd.DataFrame()
- if changzhan not in mc_vesion_maps.keys():
- print(f'{changzhan} not in mc_vesion_maps')
- return None
- max_time = df[0].max()
- wind_maps, _ = plt_service.get_wind_info(changzhan, False)
- wind_nos = warn_fault_point_maps[changzhan].keys()
- wind_seq_no_map = warn_fault_point_maps[changzhan]
- exists_data_dict = exists_windno_seq_fault(changzhan)
- for wind_no in wind_nos:
- seq_nos = list(wind_seq_no_map[wind_no])
- seq_nos.insert(0, 0)
- now_df = df[seq_nos]
- now_df.set_index(keys=0, inplace=True)
- now_df = now_df.T
- cols = list(now_df.columns)
- # 设置KEY 为 顺序号_错误码
- seq_no_dict = dict()
- # 循环每行的数据
- for index, row in now_df.iterrows():
- values = [int(i) for i in row.values]
- seq_no = index
- is_first = True
- for i in range(len(values)):
- key = f'{seq_no}_{values[i]}'
- if values[i] == 0 or values[i] == 309001:
- # 如果当前数据存在错误,则更新字典,否则更新表数据
- if key in seq_no_dict.keys():
- for data in seq_no_dict[key]:
- if data[4] is None:
- data[4] = cols[i]
- else:
- # TODO 写数据库
- if is_first:
- trans_service.update_warn_fault_exist_data(changzhan, seq_no, cols[i])
- is_first = False
- elif values[i] > 0:
- if key not in seq_no_dict.keys():
- seq_no_dict[key] = [wind_no, seq_no, values[i], cols[i], None]
- # result = [sublist for key in seq_no_dict for sublist in seq_no_dict[key]]
- result = seq_no_dict.values()
- result_df = pd.DataFrame(result,
- columns=['wind_turbine_name', 'seq_no', 'fault_code', 'begin_time', 'end_time'])
- result_df['mc_version'] = result_df['wind_turbine_name'].apply(lambda x: mc_vesion_maps[changzhan].get(x, None))
- result_df.dropna(subset=['mc_version'], inplace=True)
- def get_fault_dict(x, index):
- if x['mc_version'] not in warn_falut_code_maps.keys():
- return None
- if x['fault_code'] not in warn_falut_code_maps[x['mc_version']].keys():
- return None
- return warn_falut_code_maps[x['mc_version']][x['fault_code']][index]
- result_df['fault_detail'] = result_df[['mc_version', 'fault_code']].apply(get_fault_dict, args=(0,), axis=1)
- result_df['fault_level'] = result_df[['mc_version', 'fault_code']].apply(get_fault_dict, args=(1,), axis=1)
- result_df.dropna(subset=['fault_detail', 'fault_level'], how='all', inplace=True)
- result_df['begin_time'] = pd.to_datetime(result_df['begin_time'], errors='coerce')
- result_df['end_time'] = pd.to_datetime(result_df['end_time'], errors='coerce')
- result_df['time_diff'] = (result_df['end_time'] - result_df['begin_time']).dt.total_seconds()
- result_df['wind_turbine_name'] = result_df['wind_turbine_name'].astype(str)
- result_df['wind_turbine_number'] = result_df['wind_turbine_name'].map(wind_maps)
- wind_df = pd.concat([wind_df, result_df])
- # wind_df.to_csv(f'tmp/104/warn_fault/{changzhan}.csv', index=False)
- warn_df = wind_df[wind_df['fault_level'] != '故障']
- fault_df = wind_df[wind_df['fault_level'] == '故障']
- # exists_data_dict
- def del_exist_data(df: pd.DataFrame(), exists_data: list):
- df['key_no'] = df.apply(lambda x: f"{x['wind_turbine_name']}_{x['seq_no']}_{x['fault_code']}", axis=1)
- df = df[~((df['time_diff'].isna()) & (df['key_no'].isin(exists_data)))]
- del df['key_no']
- return df
- warn_df = del_exist_data(warn_df, exists_data_dict['warn'])
- fault_df = del_exist_data(fault_df, exists_data_dict['fault'])
- warn_df.sort_values(by='begin_time', inplace=True)
- fault_df.sort_values(by='begin_time', inplace=True)
- if not warn_df.empty:
- trans_service.save_df_to_db(f'{changzhan}_warn', warn_df)
- warn_max_date = warn_df['begin_time'].max()
- add_date_str = warn_max_date.strftime('%Y-%m-%d')
- warn_last_date_str = warn_max_date.strftime('%Y-%m-%d %H:%M:%S')
- # wind_farm_code, wind_farm_name, add_date, trans_type, count, latest_data_time
- trans_service.update_wind_farm_day_count(changzhan, wind_code_name_map.get(changzhan, ''), add_date_str,
- 'warn', warn_df.shape[0], warn_last_date_str, 1)
- if not fault_df.empty:
- trans_service.save_df_to_db(f'{changzhan}_fault', fault_df)
- fault_max_date = fault_df['begin_time'].max()
- add_date_str = fault_max_date.strftime('%Y-%m-%d')
- fault_last_date_str = fault_max_date.strftime('%Y-%m-%d %H:%M:%S')
- trans_service.update_wind_farm_day_count(changzhan, wind_code_name_map.get(changzhan, ''), add_date_str,
- 'fault', fault_df.shape[0], fault_last_date_str, 1)
- trans_service.update_warn_fault_update_time(date_str,max_time)
- if __name__ == '__main__':
- time.sleep(40)
- total_begin = time.time()
- begin = time.time()
- date_str = datetime.now().strftime('%Y_%m_%d')
- wind_code_name_map = plt_service.get_all_wind_by_company_code('COM00002')
- conf_path = os.path.abspath(f"./conf/config.yaml")
- yaml_config = yaml_conf(conf_path)
- data_base_dir = read_conf(yaml_config, 'data_base_dir')
- read_dir = os.path.join(data_base_dir, '2406')
- # 故障报警测点
- warn_fault_point_maps = generate_warn_falut_maps('conf/故障报警测点-2406.xlsx')
- # 主控代码
- warn_falut_code_maps = generate_warn_falut_code_maps('conf/主控故障代码表-2406.xlsx')
- # 主控版本
- mc_vesion_maps = generate_mc_version_maps('conf/主控版本-2406.xlsx')
- for root, dirs, files in os.walk(read_dir):
- files.sort()
- for file in files:
- exec_begin = time.time()
- file_dir = os.path.basename(root)
- read_csv = root + os.sep + file
- df = pd.read_csv(read_csv, header=None)
- begin = time.time()
- with multiprocessing.Pool(5) as pool:
- pool.starmap(generate_warn_fault_data,
- [(df, warn_fault_point_maps, warn_falut_code_maps, mc_vesion_maps, changzhan,
- wind_code_name_map)
- for changzhan in
- warn_fault_point_maps.keys()])
- # TODO 保存最近一条数据到临时文件中以供下次使用
- # last_data.to_csv(file_path, header=False, index=False)
- # new_file = read_csv.replace('data/2406', 'data_bak/2406 ')
- # shutil.copy(read_csv, new_file)
- os.remove(read_csv)
- logger.info(
- f'故障报警执行完:{file_dir}/{file}耗时:{time.time() - exec_begin},总耗时:{time.time() - total_begin}')
- logger.info(f'故障报警总耗时:{time.time() - total_begin}')
|