FaultWarnTrans.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import os
  2. import numpy as np
  3. import pandas as pd
  4. from etl.common.BaseDataTrans import BaseDataTrans
  5. from service.plt_service import update_trans_status_error
  6. from service.trans_service import get_fault_warn_conf
  7. from utils.conf.read_conf import read_conf
  8. from utils.file.trans_methods import read_excel_files, read_file_to_df, create_file_path
  9. from utils.log.trans_log import trans_print
  10. class FaultWarnTrans(BaseDataTrans):
  11. def __init__(self, data: dict = None, save_db=True, step=0, end=4):
  12. super(FaultWarnTrans, self).__init__(data, save_db, step, end)
  13. def get_filed_conf(self):
  14. return get_fault_warn_conf(self.field_code, self.read_type)
  15. # 第三步 读取 并 保存到临时文件
  16. def read_and_save_tmp_file(self):
  17. trans_print("无需保存临时文件")
  18. # 第四步 统计 并 保存到正式文件
  19. def statistics_and_save_to_file(self):
  20. conf_map = self.get_filed_conf()
  21. if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
  22. message = f"未找到{self.batch_no}的{self.read_type}配置"
  23. trans_print(message)
  24. update_trans_status_error(self.batch_no, self.read_type, message, self.save_db)
  25. else:
  26. for key, v in conf_map.items():
  27. if v and type(v) == str:
  28. v = v.replace("\r\n", "").replace("\n", "")
  29. conf_map[key] = v
  30. read_fields_keys = [i for i in conf_map.keys() if i.startswith('field_')]
  31. # 需要执行 exec的字段
  32. # exec_fields = [(k.replace("exec_", ""), v) for k, v in conf_map.items() if k.startswith('exec_')]
  33. # 读取需要执行 筛选的字段
  34. select_fields = [(k.replace("select_", ""), v) for k, v in conf_map.items() if
  35. k.startswith('select_') and v]
  36. time_format = read_conf(conf_map, 'time_format')
  37. trans_map = dict()
  38. trans_cols = []
  39. for key in read_fields_keys:
  40. field_value = read_conf(conf_map, key)
  41. if field_value:
  42. vas = str(field_value).split('|')
  43. trans_cols.extend(vas)
  44. field_key = key.replace("field_", "")
  45. for v in vas:
  46. trans_map[v] = field_key
  47. all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
  48. df = pd.DataFrame()
  49. for file in all_files:
  50. now_df = read_file_to_df(file, trans_cols=trans_cols)
  51. df = pd.concat([df, now_df], ignore_index=True)
  52. df.rename(columns=trans_map, inplace=True)
  53. if time_format:
  54. df['begin_time'] = pd.to_datetime(df['begin_time'])
  55. df['end_time'] = pd.to_datetime(df['end_time'])
  56. else:
  57. df['begin_time'] = pd.to_datetime(df['begin_time'], format=time_format)
  58. df['end_time'] = pd.to_datetime(df['end_time'], format=time_format)
  59. exec_wind_turbine_number = read_conf(conf_map, 'exec_wind_turbine_number')
  60. if exec_wind_turbine_number:
  61. exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {exec_wind_turbine_number} )"
  62. df['wind_turbine_number'] = eval(exec_str)
  63. for field, select_str in select_fields:
  64. use_in = True
  65. if str(select_str).strip().startswith("!"):
  66. use_in = False
  67. select_str = select_str[1:]
  68. select_str = select_str.replace("'", "").replace("[", "").replace("]", "")
  69. values = select_str.split(',')
  70. if df[field].dtype == int:
  71. values = [int(i) for i in values]
  72. if use_in:
  73. df = df[df[field].isin(values)]
  74. else:
  75. df = df[~df[field].isin(values)]
  76. df['wind_turbine_name'] = df['wind_turbine_number']
  77. df['wind_turbine_number'] = df['wind_turbine_number'].map(
  78. self.wind_col_trans).fillna(df['wind_turbine_number'])
  79. df['time_diff'] = (df['end_time'] - df['begin_time']).dt.total_seconds()
  80. df.loc[df['time_diff'] < 0, 'time_diff'] = np.nan
  81. if self.save_zip:
  82. save_path = os.path.join(self.pathsAndTable.get_save_path(), str(self.batch_name) + '.csv.gz')
  83. else:
  84. save_path = os.path.join(self.pathsAndTable.get_save_path(), str(self.batch_name) + '.csv')
  85. create_file_path(save_path, is_file_path=True)
  86. if self.save_zip:
  87. df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')
  88. else:
  89. df.to_csv(save_path, index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')