FaultWarnTrans.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. import os.path
  2. from os import *
  3. import numpy as np
  4. import pandas as pd
  5. from etl.common.BaseDataTrans import BaseDataTrans
  6. from service.trans_conf_service import update_trans_status_error, update_trans_status_success
  7. from service.trans_service import get_fault_warn_conf, drop_table, create_warn_fault_table, \
  8. save_file_to_db
  9. from utils.conf.read_conf import read_conf
  10. from utils.file.trans_methods import read_excel_files, read_file_to_df, create_file_path, valid_eval
  11. from utils.log.trans_log import trans_print
  12. class FaultWarnTrans(BaseDataTrans):
  13. def __init__(self, data: dict = None, save_db=True, yaml_config=None, step=0, end=6):
  14. super(FaultWarnTrans, self).__init__(data, save_db, yaml_config, step, end)
  15. self.engine_count = 0
  16. self.min_date = None
  17. self.max_date = None
  18. self.data_count = 0
  19. def get_filed_conf(self):
  20. return get_fault_warn_conf(self.wind_farm_code, self.transfer_type)
  21. # 第三步 读取 并 保存到临时文件
  22. def read_and_save_tmp_file(self):
  23. trans_print("无需保存临时文件")
  24. # 读取并保存到临时正式文件
  25. def statistics_and_save_tmp_formal_file(self):
  26. conf_map = self.get_filed_conf()
  27. if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
  28. message = f"未找到{self.id}的{self.transfer_type}配置"
  29. trans_print(message)
  30. update_trans_status_error(self.id, message, self.save_db)
  31. else:
  32. for key, v in conf_map.items():
  33. if v and type(v) == str:
  34. v = v.replace("\r\n", "").replace("\n", "")
  35. conf_map[key] = v
  36. read_fields_keys = [i for i in conf_map.keys() if i.startswith('field_')]
  37. # 需要执行 exec的字段
  38. # exec_fields = [(k.replace("exec_", ""), v) for k, v in conf_map.items() if k.startswith('exec_')]
  39. # 读取需要执行 筛选的字段
  40. select_fields = [(k.replace("select_", ""), v) for k, v in conf_map.items() if
  41. k.startswith('select_') and v]
  42. time_format = read_conf(conf_map, 'time_format')
  43. trans_map = dict()
  44. trans_cols = []
  45. for key in read_fields_keys:
  46. field_value = read_conf(conf_map, key)
  47. if field_value:
  48. vas = str(field_value).split('|')
  49. trans_cols.extend(vas)
  50. field_key = key.replace("field_", "")
  51. for v in vas:
  52. trans_map[v] = field_key
  53. all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
  54. df = pd.DataFrame()
  55. for file in all_files:
  56. now_df = read_file_to_df(file, trans_cols=trans_cols)
  57. df = pd.concat([df, now_df], ignore_index=True)
  58. df.rename(columns=trans_map, inplace=True)
  59. for col in df.columns:
  60. if 'field_' + col not in read_fields_keys:
  61. del df[col]
  62. if time_format:
  63. if valid_eval(time_format):
  64. eval_str = f"df['begin_time'].apply(lambda error_time: {time_format} )"
  65. df['begin_time'] = eval(eval_str)
  66. if 'end_time' in df.columns:
  67. eval_str = f"df['end_time'].apply(lambda error_time: {time_format} )"
  68. df['end_time'] = eval(eval_str)
  69. df['begin_time'] = pd.to_datetime(df['begin_time'], errors='coerce')
  70. if 'end_time' in df.columns:
  71. df['end_time'] = pd.to_datetime(df['end_time'], errors='coerce')
  72. exec_wind_turbine_number = read_conf(conf_map, 'exec_wind_turbine_number')
  73. if exec_wind_turbine_number:
  74. if valid_eval(exec_wind_turbine_number):
  75. exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {exec_wind_turbine_number} )"
  76. df['wind_turbine_number'] = eval(exec_str)
  77. for field, select_str in select_fields:
  78. use_in = True
  79. if str(select_str).strip().startswith("!"):
  80. use_in = False
  81. select_str = select_str[1:]
  82. select_str = select_str.replace("'", "").replace("[", "").replace("]", "")
  83. values = select_str.split(',')
  84. if df[field].dtype == int:
  85. values = [int(i) for i in values]
  86. if use_in:
  87. df = df[df[field].isin(values)]
  88. else:
  89. df = df[~df[field].isin(values)]
  90. df['wind_turbine_name'] = df['wind_turbine_number']
  91. df['wind_turbine_number'] = df['wind_turbine_number'].map(
  92. self.wind_col_trans).fillna(df['wind_turbine_number'])
  93. if 'end_time' in df.columns:
  94. df['time_diff'] = (df['end_time'] - df['begin_time']).dt.total_seconds()
  95. df.loc[df['time_diff'] < 0, 'time_diff'] = np.nan
  96. if self.save_zip:
  97. save_path = path.join(self.pathsAndTable.get_tmp_formal_path(),
  98. str(self.pathsAndTable.read_type) + '.csv.gz')
  99. else:
  100. save_path = path.join(self.pathsAndTable.get_tmp_formal_path(),
  101. str(self.pathsAndTable.read_type) + '.csv')
  102. create_file_path(save_path, is_file_path=True)
  103. df.to_csv(save_path, index=False, encoding='utf-8')
  104. # 归档文件
  105. # def archive_file(self):
  106. # trans_print("无需归档文件")
  107. # 合并到正式文件
  108. def combine_and_save_formal_file(self):
  109. df = read_file_to_df(
  110. os.path.join(self.pathsAndTable.get_tmp_formal_path(), str(self.pathsAndTable.read_type) + '.csv'))
  111. self.engine_count = len(df['wind_turbine_number'].unique())
  112. self.min_date = df['begin_time'].min()
  113. self.max_date = df['begin_time'].max()
  114. self.data_count = df.shape[0]
  115. df = df[df['wind_turbine_number'].isin(self.wind_col_trans.values())]
  116. save_path = os.path.join(self.pathsAndTable.get_save_path(), str(self.pathsAndTable.read_type) + '.csv')
  117. exists_df = pd.DataFrame()
  118. if os.path.exists(save_path):
  119. exists_df = read_file_to_df(save_path)
  120. else:
  121. create_file_path(save_path, is_file_path=True)
  122. df = pd.concat([exists_df, df], ignore_index=True)
  123. df.drop_duplicates(inplace=True, keep='last')
  124. self.update_files = [save_path]
  125. # 根据开始时间进行排序
  126. df.sort_values(by=['wind_turbine_number', 'begin_time'], inplace=True)
  127. if self.save_zip:
  128. df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')
  129. else:
  130. df.to_csv(save_path, index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')
  131. def save_to_db(self):
  132. table_name = self.pathsAndTable.get_table_name()
  133. drop_table(table_name)
  134. create_warn_fault_table(table_name)
  135. save_file_to_db(table_name, self.update_files[0], self.batch_count)
  136. def update_exec_progress(self):
  137. update_trans_status_success(self.id,
  138. self.engine_count, None, self.min_date, self.max_date, self.data_count,
  139. self.save_db)