parse_warn_fault_data.py 11 KB


  1. """
  2. 读取104规约返回的数据,并标准化到临时表中
  3. """
  4. import multiprocessing
  5. import os.path
  6. import time
  7. import warnings
  8. from datetime import datetime, timedelta
  9. import pandas as pd
  10. from service import plt_service, trans_service
  11. from utils.conf.read_conf import yaml_conf, read_conf
  12. from utils.log.trans_log import logger
  13. warnings.filterwarnings('ignore')
  14. def is_file_modified_recently(file_path, minutes=15):
  15. """
  16. 检查文件最后修改时间是否在指定的分钟数内
  17. 参数:
  18. file_path: 文件路径
  19. minutes: 检查的分钟数(默认为15分钟)
  20. 返回:
  21. bool: True表示在指定时间内修改过,False表示超过指定时间未修改
  22. """
  23. if not os.path.exists(file_path):
  24. raise FileNotFoundError(f"文件不存在: {file_path}")
  25. # 获取文件最后修改时间(时间戳)
  26. mod_time = os.path.getmtime(file_path)
  27. # 转换为datetime对象
  28. mod_datetime = datetime.fromtimestamp(mod_time)
  29. logger.info(f'文件修改时间:{mod_datetime}')
  30. # 计算当前时间与修改时间的差
  31. time_diff = datetime.now() - mod_datetime
  32. # 检查是否在指定分钟内
  33. return time_diff <= timedelta(minutes=minutes)
  34. def generate_mesurepoint_maps(file_path):
  35. df = pd.read_excel(file_path)
  36. wind_maps = dict()
  37. for _, data in df.iterrows():
  38. shunxuhao = int(data['顺序号']) + 1
  39. changzhan = data['场站标准化编号']
  40. wind_no = data['风机号']
  41. en_name = data['标准化英文']
  42. if changzhan in wind_maps.keys():
  43. if wind_no in wind_maps[changzhan].keys():
  44. wind_maps[changzhan][wind_no][shunxuhao] = en_name
  45. else:
  46. wind_maps[changzhan][wind_no] = {shunxuhao: en_name}
  47. else:
  48. wind_maps[changzhan] = {wind_no: {shunxuhao: en_name}}
  49. return wind_maps
  50. def generate_warn_falut_code_maps(file_path):
  51. df = pd.read_excel(file_path)
  52. df['主控ID'] = df['主控ID'].str.strip()
  53. df['状态编码'] = df['状态编码'].astype(int)
  54. df['SC中文描述'] = df['SC中文描述'].str.strip()
  55. df['告警等级'] = df['告警等级'].str.strip()
  56. df['告警等级'] = df['告警等级'].fillna("告警信息")
  57. result_map = dict()
  58. for index, row in df.iterrows():
  59. controller_id = row['主控ID'].strip()
  60. status_code = int(row['状态编码'])
  61. cn_des = row['SC中文描述']
  62. level = row['告警等级']
  63. if controller_id in result_map.keys():
  64. result_map[controller_id][status_code] = (cn_des, level)
  65. else:
  66. result_map[controller_id] = {status_code: (cn_des, level)}
  67. return result_map
  68. def generate_warn_falut_maps(file_path):
  69. df = pd.read_excel(file_path)
  70. warn_fault_maps = dict()
  71. for _, data in df.iterrows():
  72. seq_no = int(data['顺序号']) + 1
  73. changzhan = data['场站标准化编号']
  74. wind_no = data['风机号']
  75. if changzhan in warn_fault_maps.keys():
  76. if wind_no in warn_fault_maps[changzhan].keys():
  77. warn_fault_maps[changzhan][wind_no].append(seq_no)
  78. else:
  79. warn_fault_maps[changzhan][wind_no] = [seq_no]
  80. else:
  81. warn_fault_maps[changzhan] = {wind_no: [seq_no]}
  82. return warn_fault_maps
  83. def generate_mc_version_maps(file_path):
  84. df = pd.read_excel(file_path)
  85. mc_version_maps = dict()
  86. for _, data in df.iterrows():
  87. changzhan = data['场站标准化编号'].strip()
  88. wind_no = data['风机号']
  89. mc_versioin = data['主控版本'].strip()
  90. if changzhan in mc_version_maps.keys():
  91. mc_version_maps[changzhan][wind_no] = mc_versioin
  92. else:
  93. mc_version_maps[changzhan] = {wind_no: mc_versioin}
  94. return mc_version_maps
  95. def exists_windno_seq_fault(changzhan):
  96. exists_data_dict = trans_service.exists_windno_seq_fault(changzhan)
  97. return exists_data_dict
  98. def generate_warn_fault_data(df, warn_fault_point_maps, warn_falut_code_maps, mc_vesion_maps, changzhan,
  99. wind_code_name_map):
  100. wind_df = pd.DataFrame()
  101. if changzhan not in mc_vesion_maps.keys():
  102. print(f'{changzhan} not in mc_vesion_maps')
  103. return None
  104. max_time = df[0].max()
  105. wind_maps, _ = plt_service.get_wind_info(changzhan, False)
  106. wind_nos = warn_fault_point_maps[changzhan].keys()
  107. wind_seq_no_map = warn_fault_point_maps[changzhan]
  108. exists_data_dict = exists_windno_seq_fault(changzhan)
  109. for wind_no in wind_nos:
  110. seq_nos = list(wind_seq_no_map[wind_no])
  111. seq_nos.insert(0, 0)
  112. now_df = df[seq_nos]
  113. now_df.set_index(keys=0, inplace=True)
  114. now_df = now_df.T
  115. cols = list(now_df.columns)
  116. # 设置KEY 为 顺序号_错误码
  117. seq_no_dict = dict()
  118. # 循环每行的数据
  119. for index, row in now_df.iterrows():
  120. values = [int(i) for i in row.values]
  121. seq_no = index
  122. is_first = True
  123. for i in range(len(values)):
  124. key = f'{seq_no}_{values[i]}'
  125. if values[i] == 0 or values[i] == 309001:
  126. # 如果当前数据存在错误,则更新字典,否则更新表数据
  127. if key in seq_no_dict.keys():
  128. for data in seq_no_dict[key]:
  129. if data[4] is None:
  130. data[4] = cols[i]
  131. else:
  132. # TODO 写数据库
  133. if is_first:
  134. trans_service.update_warn_fault_exist_data(changzhan, seq_no, cols[i])
  135. is_first = False
  136. elif values[i] > 0:
  137. if key not in seq_no_dict.keys():
  138. seq_no_dict[key] = [wind_no, seq_no, values[i], cols[i], None]
  139. # result = [sublist for key in seq_no_dict for sublist in seq_no_dict[key]]
  140. result = seq_no_dict.values()
  141. result_df = pd.DataFrame(result,
  142. columns=['wind_turbine_name', 'seq_no', 'fault_code', 'begin_time', 'end_time'])
  143. result_df['mc_version'] = result_df['wind_turbine_name'].apply(lambda x: mc_vesion_maps[changzhan].get(x, None))
  144. result_df.dropna(subset=['mc_version'], inplace=True)
  145. def get_fault_dict(x, index):
  146. if x['mc_version'] not in warn_falut_code_maps.keys():
  147. return None
  148. if x['fault_code'] not in warn_falut_code_maps[x['mc_version']].keys():
  149. return None
  150. return warn_falut_code_maps[x['mc_version']][x['fault_code']][index]
  151. result_df['fault_detail'] = result_df[['mc_version', 'fault_code']].apply(get_fault_dict, args=(0,), axis=1)
  152. result_df['fault_level'] = result_df[['mc_version', 'fault_code']].apply(get_fault_dict, args=(1,), axis=1)
  153. result_df.dropna(subset=['fault_detail', 'fault_level'], how='all', inplace=True)
  154. result_df['begin_time'] = pd.to_datetime(result_df['begin_time'], errors='coerce')
  155. result_df['end_time'] = pd.to_datetime(result_df['end_time'], errors='coerce')
  156. result_df['time_diff'] = (result_df['end_time'] - result_df['begin_time']).dt.total_seconds()
  157. result_df['wind_turbine_name'] = result_df['wind_turbine_name'].astype(str)
  158. result_df['wind_turbine_number'] = result_df['wind_turbine_name'].map(wind_maps)
  159. wind_df = pd.concat([wind_df, result_df])
  160. # wind_df.to_csv(f'tmp/104/warn_fault/{changzhan}.csv', index=False)
  161. warn_df = wind_df[wind_df['fault_level'] != '故障']
  162. fault_df = wind_df[wind_df['fault_level'] == '故障']
  163. # exists_data_dict
  164. def del_exist_data(df: pd.DataFrame(), exists_data: list):
  165. df['key_no'] = df.apply(lambda x: f"{x['wind_turbine_name']}_{x['seq_no']}_{x['fault_code']}", axis=1)
  166. df = df[~((df['time_diff'].isna()) & (df['key_no'].isin(exists_data)))]
  167. del df['key_no']
  168. return df
  169. warn_df = del_exist_data(warn_df, exists_data_dict['warn'])
  170. fault_df = del_exist_data(fault_df, exists_data_dict['fault'])
  171. warn_df.sort_values(by='begin_time', inplace=True)
  172. fault_df.sort_values(by='begin_time', inplace=True)
  173. if not warn_df.empty:
  174. trans_service.save_df_to_db(f'{changzhan}_warn', warn_df)
  175. warn_max_date = warn_df['begin_time'].max()
  176. add_date_str = warn_max_date.strftime('%Y-%m-%d')
  177. warn_last_date_str = warn_max_date.strftime('%Y-%m-%d %H:%M:%S')
  178. # wind_farm_code, wind_farm_name, add_date, trans_type, count, latest_data_time
  179. trans_service.update_wind_farm_day_count(changzhan, wind_code_name_map.get(changzhan, ''), add_date_str,
  180. 'warn', warn_df.shape[0], warn_last_date_str, 1)
  181. if not fault_df.empty:
  182. trans_service.save_df_to_db(f'{changzhan}_fault', fault_df)
  183. fault_max_date = fault_df['begin_time'].max()
  184. add_date_str = fault_max_date.strftime('%Y-%m-%d')
  185. fault_last_date_str = fault_max_date.strftime('%Y-%m-%d %H:%M:%S')
  186. trans_service.update_wind_farm_day_count(changzhan, wind_code_name_map.get(changzhan, ''), add_date_str,
  187. 'fault', fault_df.shape[0], fault_last_date_str, 1)
  188. trans_service.update_warn_fault_update_time(date_str,max_time)
  189. if __name__ == '__main__':
  190. time.sleep(40)
  191. total_begin = time.time()
  192. begin = time.time()
  193. date_str = datetime.now().strftime('%Y_%m_%d')
  194. wind_code_name_map = plt_service.get_all_wind_by_company_code('COM00002')
  195. conf_path = os.path.abspath(f"./conf/config.yaml")
  196. yaml_config = yaml_conf(conf_path)
  197. data_base_dir = read_conf(yaml_config, 'data_base_dir')
  198. read_dir = os.path.join(data_base_dir, '2406')
  199. # 故障报警测点
  200. warn_fault_point_maps = generate_warn_falut_maps('conf/故障报警测点-2406.xlsx')
  201. # 主控代码
  202. warn_falut_code_maps = generate_warn_falut_code_maps('conf/主控故障代码表-2406.xlsx')
  203. # 主控版本
  204. mc_vesion_maps = generate_mc_version_maps('conf/主控版本-2406.xlsx')
  205. for root, dirs, files in os.walk(read_dir):
  206. files.sort()
  207. for file in files:
  208. exec_begin = time.time()
  209. file_dir = os.path.basename(root)
  210. read_csv = root + os.sep + file
  211. df = pd.read_csv(read_csv, header=None)
  212. begin = time.time()
  213. with multiprocessing.Pool(5) as pool:
  214. pool.starmap(generate_warn_fault_data,
  215. [(df, warn_fault_point_maps, warn_falut_code_maps, mc_vesion_maps, changzhan,
  216. wind_code_name_map)
  217. for changzhan in
  218. warn_fault_point_maps.keys()])
  219. # TODO 保存最近一条数据到临时文件中以供下次使用
  220. # last_data.to_csv(file_path, header=False, index=False)
  221. # new_file = read_csv.replace('data/2406', 'data_bak/2406 ')
  222. # shutil.copy(read_csv, new_file)
  223. os.remove(read_csv)
  224. logger.info(
  225. f'故障报警执行完:{file_dir}/{file}耗时:{time.time() - exec_begin},总耗时:{time.time() - total_begin}')
  226. logger.info(f'故障报警总耗时:{time.time() - total_begin}')