run_data.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. import json
  2. import multiprocessing
  3. import os
  4. import shutil
  5. import warnings
  6. from service import trans_service
  7. from service.plt_service import get_wind_info, get_all_wind_by_company_code
  8. from tool.ClassIdentifier import ClassIdentifier
  9. from utils.common_utils import read_file, get_all_files
  10. warnings.filterwarnings('ignore')
  11. from service.trans_service import *
  12. # def generate_warn_falut_code_maps(file_path):
  13. # df = pd.read_excel(file_path)
  14. # df['主控ID'] = df['主控ID'].str.strip()
  15. # df['状态编码'] = df['状态编码'].astype(int)
  16. # df['SC中文描述'] = df['SC中文描述'].str.strip()
  17. # df['告警等级'] = df['告警等级'].str.strip()
  18. # df['告警等级'] = df['告警等级'].fillna("告警信息")
  19. #
  20. # result_map = dict()
  21. # for index, row in df.iterrows():
  22. # controller_id = row['主控ID'].strip()
  23. # status_code = int(row['状态编码'])
  24. # cn_des = row['SC中文描述']
  25. # level = row['告警等级']
  26. # if controller_id in result_map.keys():
  27. # result_map[controller_id][status_code] = (cn_des, level)
  28. # else:
  29. # result_map[controller_id] = {status_code: (cn_des, level)}
  30. #
  31. # return result_map
  32. #
  33. #
  34. # def generate_mc_version_maps(file_path):
  35. # df = pd.read_excel(file_path)
  36. #
  37. # mc_version_maps = dict()
  38. # for _, data in df.iterrows():
  39. # changzhan = data['场站标准化编号'].strip()
  40. # wind_no = str(data['风机号'])
  41. # mc_versioin = data['主控版本'].strip()
  42. # if changzhan in mc_version_maps.keys():
  43. # mc_version_maps[changzhan][wind_no] = mc_versioin
  44. # else:
  45. # mc_version_maps[changzhan] = {wind_no: mc_versioin}
  46. #
  47. # return mc_version_maps
  48. def scada_read_and_save_db(file_path, wind_factory_map: dict):
  49. try:
  50. df = read_file(file_path)
  51. if 'wind_farm_code' in df.columns:
  52. del df['wind_farm_code']
  53. wind_no = os.path.basename(file_path).split('_')[0]
  54. df['wind_turbine_name'] = df['wind_turbine_name'].astype('str')
  55. wind_col_trans, rated_power_and_cutout_speed_map = get_wind_info(wind_no)
  56. df['wind_turbine_number'] = df['wind_turbine_name'].map(wind_col_trans).fillna(df['wind_turbine_name'])
  57. wind_turbine_number = df['wind_turbine_number'].unique()[0]
  58. df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
  59. df.dropna(subset=['time_stamp'], inplace=True)
  60. df.sort_values(by='time_stamp', inplace=True)
  61. logger.info(f"有功功率前10个 :{df.head(10)['active_power'].values}")
  62. power_df = df[df['active_power'] > 0]
  63. logger.info(f"{wind_turbine_number} 功率大于0的数量:{power_df.shape}")
  64. power = power_df.sample(int(power_df.shape[0] / 100))['active_power'].median()
  65. logger.info(f"{wind_turbine_number} 有功功率,中位数:{power}")
  66. rated_power_and_cutout_speed_tuple = rated_power_and_cutout_speed_map.get(str(wind_turbine_number), None)
  67. if rated_power_and_cutout_speed_tuple is None:
  68. rated_power_and_cutout_speed_tuple = (None, None)
  69. sec_df_origin = df.copy()
  70. class_identifiler = ClassIdentifier(wind_turbine_number=wind_turbine_number, origin_df=sec_df_origin,
  71. rated_power=rated_power_and_cutout_speed_tuple[0],
  72. cut_out_speed=rated_power_and_cutout_speed_tuple[1])
  73. sec_df = class_identifiler.run()
  74. if not sec_df.empty:
  75. sec_df['year'] = sec_df['time_stamp'].dt.year
  76. sec_df['month'] = sec_df['time_stamp'].dt.month
  77. sec_df['day'] = sec_df['time_stamp'].dt.day
  78. sec_df['time_stamp'] = sec_df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  79. sec_df['year_month'] = sec_df[['year', 'month']].apply(lambda x: str(x['year']) + str(x['month']).zfill(2),
  80. axis=1)
  81. min_df_origin = df[df['time_stamp'].dt.minute % 10 == 0]
  82. class_identifiler = ClassIdentifier(wind_turbine_number=wind_turbine_number, origin_df=min_df_origin,
  83. rated_power=rated_power_and_cutout_speed_tuple[0],
  84. cut_out_speed=rated_power_and_cutout_speed_tuple[1])
  85. min_df = class_identifiler.run()
  86. if not min_df.empty:
  87. min_df['year'] = min_df['time_stamp'].dt.year
  88. min_df['month'] = min_df['time_stamp'].dt.month
  89. min_df['day'] = min_df['time_stamp'].dt.day
  90. min_df['time_stamp'] = min_df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  91. min_df['year_month'] = min_df[['year', 'month']].apply(lambda x: str(x['year']) + str(x['month']).zfill(2),
  92. axis=1)
  93. save_df_to_db(f'{wind_no}_second', sec_df)
  94. save_df_to_db(f'{wind_no}_minute', min_df)
  95. update_day_count(sec_df, wind_no, wind_factory_map[wind_no], 'second')
  96. update_day_count(min_df, wind_no, wind_factory_map[wind_no], 'minute')
  97. os.remove(file_path)
  98. except:
  99. logger.error(f"{file_path}转化失败")
  100. logger.error(traceback.format_exc(), exc_info=True)
  101. error_path = file_path.replace('scada', 'scada_error')
  102. os.makedirs(os.path.dirname(error_path), exist_ok=True)
  103. shutil.move(file_path, error_path)
  104. finally:
  105. logger.info(f'scada/{os.path.basename(file_path)}执行结束')
  106. def generate_warn_fault_data(df):
  107. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  108. # 识别成功状态
  109. success_codes = {0, 309001}
  110. df['Is_Success'] = df['fault_code'].isin(success_codes)
  111. # 找到故障开始和结束时间
  112. result = []
  113. i = 0
  114. n = len(df)
  115. while i < n:
  116. current_status = df.iloc[i]['fault_code']
  117. current_time = df.iloc[i]['time_stamp']
  118. # 如果是故障状态(非成功状态)
  119. if current_status not in success_codes:
  120. # 记录故障开始
  121. fault_start = current_time
  122. fault_status = current_status
  123. # 跳过连续的相同故障状态
  124. while i < n and df.iloc[i]['fault_code'] == fault_status:
  125. i += 1
  126. # 找到下一个成功状态的时间
  127. success_time = None
  128. fault_duration = None
  129. # 寻找下一个成功状态
  130. for j in range(i, n):
  131. if df.iloc[j]['fault_code'] in success_codes:
  132. success_time = df.iloc[j]['time_stamp']
  133. fault_duration = int((success_time - fault_start).total_seconds())
  134. break
  135. # 如果有成功时间,计算故障时长
  136. if success_time:
  137. # 添加到结果
  138. result.append([
  139. df.iloc[0]['wind_turbine_name'], # ID
  140. fault_start, # 故障开始时间
  141. fault_status, # 故障状态码
  142. success_time, # 成功时间
  143. fault_duration # 故障时长
  144. ])
  145. else:
  146. # 如果没有成功时间(故障持续到最后)
  147. result.append([
  148. df.iloc[0]['wind_turbine_name'],
  149. fault_start,
  150. fault_status,
  151. None,
  152. None
  153. ])
  154. else:
  155. # 如果是成功状态,继续下一行
  156. i += 1
  157. # 创建结果DataFrame
  158. result_df = pd.DataFrame(result, columns=['wind_turbine_name', 'begin_time', 'fault_code', 'end_time', 'time_diff'])
  159. result_df['fault_code'] = result_df['fault_code'].astype(int)
  160. return result_df
  161. def warn_fault_read_and_save_db(file_path, wind_factory_map: dict, mc_vesion_map: dict, warn_falut_code_map: dict):
  162. try:
  163. wind_farm_code = os.path.basename(file_path).split('_')[0]
  164. wind_turbine_name = os.path.basename(file_path).split('_')[1].replace('.csv', '')
  165. df = read_file(file_path)
  166. col_map = {'wind_turbine_id': 'wind_turbine_name', 'timestamp': 'time_stamp', 'warn_fault_code': 'fault_code'}
  167. df.rename(columns=col_map, inplace=True)
  168. df['fault_code'] = df['fault_code'].astype(int, errors='ignore')
  169. df = df.dropna(subset=['fault_code'])
  170. df['wind_turbine_name'] = df['wind_turbine_name'].astype('str')
  171. df['wind_turbine_number'] = df['wind_turbine_name'].map(wind_factory_map).fillna(df['wind_turbine_name'])
  172. df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
  173. df.dropna(subset=['time_stamp'], inplace=True)
  174. df.sort_values(by='time_stamp', inplace=True)
  175. df.reset_index(drop=True, inplace=True)
  176. first_code = df.loc[0, 'fault_code']
  177. data_time = df.loc[0, 'time_stamp']
  178. if first_code in [0, 309001]:
  179. trans_service.update_warn_fault_exist_data_with_db(wind_farm_code, wind_turbine_name, data_time)
  180. wind_col_trans, rated_power_and_cutout_speed_map = get_wind_info(wind_farm_code)
  181. save_df = generate_warn_fault_data(df)
  182. save_df['wind_farm_code'] = wind_farm_code
  183. save_df['wind_farm_name'] = wind_factory_map[wind_farm_code]
  184. save_df['wind_turbine_name'] = save_df['wind_turbine_name'].astype('str')
  185. save_df['wind_turbine_number'] = save_df['wind_turbine_name'].map(wind_col_trans)
  186. mc_vesion = mc_vesion_map[wind_farm_code][wind_turbine_name]
  187. save_df['mc_version'] = mc_vesion
  188. def get_fault_dict(x, index):
  189. if x['mc_version'] not in warn_falut_code_map.keys():
  190. return None
  191. if x['fault_code'] not in warn_falut_code_map[x['mc_version']].keys():
  192. return None
  193. return warn_falut_code_map[x['mc_version']][x['fault_code']][index]
  194. save_df['fault_detail'] = save_df[['mc_version', 'fault_code']].apply(get_fault_dict, args=(0,), axis=1)
  195. save_df['fault_level'] = save_df[['mc_version', 'fault_code']].apply(get_fault_dict, args=(1,), axis=1)
  196. select_cols = ['wind_turbine_number', 'wind_turbine_name', 'mc_version', 'begin_time', 'end_time', 'time_diff',
  197. 'fault_code', 'fault_detail', 'fault_level']
  198. warn_df = save_df[save_df['fault_level'] != '故障'][select_cols]
  199. fault_df = save_df[save_df['fault_level'] == '故障'][select_cols]
  200. warn_df.sort_values(by='begin_time', inplace=True)
  201. fault_df.sort_values(by='begin_time', inplace=True)
  202. if not warn_df.empty:
  203. trans_service.save_df_to_db(f'{wind_farm_code}_warn', warn_df)
  204. warn_max_date = warn_df['begin_time'].max()
  205. add_date_str = warn_max_date.strftime('%Y-%m-%d')
  206. warn_last_date_str = warn_max_date.strftime('%Y-%m-%d %H:%M:%S')
  207. # wind_farm_code, wind_farm_name, add_date, trans_type, count, latest_data_time
  208. trans_service.update_wind_farm_day_count(wind_farm_code, wind_factory_map.get(wind_farm_code, ''),
  209. add_date_str,
  210. 'warn', warn_df.shape[0], warn_last_date_str, 1)
  211. if not fault_df.empty:
  212. trans_service.save_df_to_db(f'{wind_farm_code}_fault', fault_df)
  213. fault_max_date = fault_df['begin_time'].max()
  214. add_date_str = fault_max_date.strftime('%Y-%m-%d')
  215. fault_last_date_str = fault_max_date.strftime('%Y-%m-%d %H:%M:%S')
  216. trans_service.update_wind_farm_day_count(wind_farm_code, wind_factory_map.get(wind_farm_code, ''),
  217. add_date_str,
  218. 'fault', fault_df.shape[0], fault_last_date_str, 1)
  219. os.remove(file_path)
  220. except:
  221. logger.error(f"{file_path}转化失败")
  222. logger.error(traceback.format_exc(), exc_info=True)
  223. error_path = file_path.replace('warn_fault', 'warn_fault_error')
  224. os.makedirs(os.path.dirname(error_path), exist_ok=True)
  225. shutil.move(file_path, error_path)
  226. finally:
  227. logger.info(f'warn_fault/{os.path.basename(file_path)}执行结束')
  228. def update_day_count(df_data: pd.DataFrame, wind_farm_code: str, wind_name, data_type: str):
  229. df = df_data[['wind_turbine_number', 'time_stamp']].copy()
  230. df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
  231. df['time_stamp'] = df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d'))
  232. res_df = df.groupby('time_stamp')['wind_turbine_number'].count().reset_index()
  233. res_df.rename(columns={'time_stamp': 'add_date', 'wind_turbine_number': 'count'}, inplace=True)
  234. res_df['latest_data_time'] = res_df['add_date']
  235. res_df['sync_status'] = 1
  236. res_df['wind_farm_code'] = wind_farm_code
  237. res_df['wind_farm_name'] = wind_name
  238. res_df['type'] = data_type
  239. for index, data in res_df.iterrows():
  240. update_wind_farm_day_count(data['wind_farm_code'], data['wind_farm_name'], data['add_date'], data['type'],
  241. data['count'], data['latest_data_time'], sync_status=1)
  242. if __name__ == '__main__':
  243. scada_dir = r'/home/trans/data/scada'
  244. warn_fault_dir = r'/home/trans/data/warn_fault'
  245. scada_files = get_all_files(scada_dir)
  246. warn_fault_files = get_all_files(warn_fault_dir)
  247. wind_factory_map = get_all_wind_by_company_code('COM00002')
  248. with open('conf/mc_vesion.json', 'r') as f:
  249. mc_vesion_map = json.load(f)
  250. with open('conf/warn_fault_mc_code.json', 'r') as f:
  251. warn_falut_code_map = json.load(f)
  252. with multiprocessing.Pool(4) as pool:
  253. pool.starmap(scada_read_and_save_db, [(i, wind_factory_map) for i in scada_files])
  254. with multiprocessing.Pool(4) as pool:
  255. pool.starmap(warn_fault_read_and_save_db,
  256. [(i, wind_factory_map, mc_vesion_map, warn_falut_code_map) for i in warn_fault_files])