import os import numpy as np import pandas as pd from etl.common.BaseDataTrans import BaseDataTrans from service.plt_service import update_trans_status_error from service.trans_service import get_fault_warn_conf from utils.conf.read_conf import read_conf from utils.file.trans_methods import read_excel_files, read_file_to_df, create_file_path from utils.log.trans_log import trans_print class FaultWarnTrans(BaseDataTrans): def __init__(self, data: dict = None, save_db=True, step=0, end=4): super(FaultWarnTrans, self).__init__(data, save_db, step, end) def get_filed_conf(self): return get_fault_warn_conf(self.field_code, self.read_type) # 第三步 读取 并 保存到临时文件 def read_and_save_tmp_file(self): trans_print("无需保存临时文件") # 第四步 统计 并 保存到正式文件 def statistics_and_save_to_file(self): conf_map = self.get_filed_conf() if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0: message = f"未找到{self.batch_no}的{self.read_type}配置" trans_print(message) update_trans_status_error(self.batch_no, self.read_type, message, self.save_db) else: for key, v in conf_map.items(): if v and type(v) == str: v = v.replace("\r\n", "").replace("\n", "") conf_map[key] = v read_fields_keys = [i for i in conf_map.keys() if i.startswith('field_')] # 需要执行 exec的字段 # exec_fields = [(k.replace("exec_", ""), v) for k, v in conf_map.items() if k.startswith('exec_')] # 读取需要执行 筛选的字段 select_fields = [(k.replace("select_", ""), v) for k, v in conf_map.items() if k.startswith('select_') and v] time_format = read_conf(conf_map, 'time_format') trans_map = dict() trans_cols = [] for key in read_fields_keys: field_value = read_conf(conf_map, key) if field_value: vas = str(field_value).split('|') trans_cols.extend(vas) field_key = key.replace("field_", "") for v in vas: trans_map[v] = field_key all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path()) df = pd.DataFrame() for file in all_files: now_df = read_file_to_df(file, trans_cols=trans_cols) df = pd.concat([df, now_df], ignore_index=True) df.rename(columns=trans_map, inplace=True) if time_format: df['begin_time'] = pd.to_datetime(df['begin_time']) df['end_time'] = pd.to_datetime(df['end_time']) else: df['begin_time'] = pd.to_datetime(df['begin_time'], format=time_format) df['end_time'] = pd.to_datetime(df['end_time'], format=time_format) exec_wind_turbine_number = read_conf(conf_map, 'exec_wind_turbine_number') if exec_wind_turbine_number: exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {exec_wind_turbine_number} )" df['wind_turbine_number'] = eval(exec_str) for field, select_str in select_fields: use_in = True if str(select_str).strip().startswith("!"): use_in = False select_str = select_str[1:] select_str = select_str.replace("'", "").replace("[", "").replace("]", "") values = select_str.split(',') if df[field].dtype == int: values = [int(i) for i in values] if use_in: df = df[df[field].isin(values)] else: df = df[~df[field].isin(values)] df['wind_turbine_name'] = df['wind_turbine_number'] df['wind_turbine_number'] = df['wind_turbine_number'].map( self.wind_col_trans).fillna(df['wind_turbine_number']) df['time_diff'] = (df['end_time'] - df['begin_time']).dt.total_seconds() df.loc[df['time_diff'] < 0, 'time_diff'] = np.nan if self.save_zip: save_path = os.path.join(self.pathsAndTable.get_save_path(), str(self.batch_name) + '.csv.gz') else: save_path = os.path.join(self.pathsAndTable.get_save_path(), str(self.batch_name) + '.csv') create_file_path(save_path, is_file_path=True) if self.save_zip: df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S') else: df.to_csv(save_path, index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')