import os.path from os import * import numpy as np import pandas as pd from etl.common.BaseDataTrans import BaseDataTrans from service.trans_conf_service import update_trans_status_error, update_trans_status_success from service.trans_service import get_fault_warn_conf, drop_table, create_warn_fault_table, \ save_file_to_db from utils.conf.read_conf import read_conf from utils.file.trans_methods import read_excel_files, read_file_to_df, create_file_path, valid_eval from utils.log.trans_log import trans_print class FaultWarnTrans(BaseDataTrans): def __init__(self, data: dict = None, save_db=True, yaml_config=None, step=0, end=999): super(FaultWarnTrans, self).__init__(data, save_db, yaml_config, step, end) self.engine_count = 0 self.min_date = None self.max_date = None self.data_count = 0 def get_filed_conf(self): return get_fault_warn_conf(self.wind_farm_code, self.transfer_type) # 第三步 读取 并 保存到临时文件 def read_and_save_tmp_file(self): trans_print("无需保存临时文件") # 读取并保存到临时正式文件 def statistics_and_save_tmp_formal_file(self): conf_map = self.get_filed_conf() if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0: message = f"未找到{self.id}的{self.transfer_type}配置" trans_print(message) update_trans_status_error(self.id, message, self.save_db) else: for key, v in conf_map.items(): if v and type(v) == str: v = v.replace("\r\n", "").replace("\n", "") conf_map[key] = v read_fields_keys = [i for i in conf_map.keys() if i.startswith('field_')] # 需要执行 exec的字段 # exec_fields = [(k.replace("exec_", ""), v) for k, v in conf_map.items() if k.startswith('exec_')] # 读取需要执行 筛选的字段 select_fields = [(k.replace("select_", ""), v) for k, v in conf_map.items() if k.startswith('select_') and v] time_format = read_conf(conf_map, 'time_format') trans_map = dict() trans_cols = [] for key in read_fields_keys: field_value = read_conf(conf_map, key) if field_value: vas = str(field_value).split('|') trans_cols.extend(vas) field_key = key.replace("field_", "") for v in vas: trans_map[v] = field_key all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path()) df = pd.DataFrame() for file in all_files: now_df = read_file_to_df(file, trans_cols=trans_cols) df = pd.concat([df, now_df], ignore_index=True) df.rename(columns=trans_map, inplace=True) for col in df.columns: if 'field_' + col not in read_fields_keys: del df[col] if time_format: if valid_eval(time_format): eval_str = f"df['begin_time'].apply(lambda error_time: {time_format} )" df['begin_time'] = eval(eval_str) if 'end_time' in df.columns: eval_str = f"df['end_time'].apply(lambda error_time: {time_format} )" df['end_time'] = eval(eval_str) df['begin_time'] = pd.to_datetime(df['begin_time'], errors='coerce') if 'end_time' in df.columns: df['end_time'] = pd.to_datetime(df['end_time'], errors='coerce') exec_wind_turbine_number = read_conf(conf_map, 'exec_wind_turbine_number') if exec_wind_turbine_number: if valid_eval(exec_wind_turbine_number): exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {exec_wind_turbine_number} )" df['wind_turbine_number'] = eval(exec_str) for field, select_str in select_fields: use_in = True if str(select_str).strip().startswith("!"): use_in = False select_str = select_str[1:] select_str = select_str.replace("'", "").replace("[", "").replace("]", "") values = select_str.split(',') if df[field].dtype == int: values = [int(i) for i in values] if use_in: df = df[df[field].isin(values)] else: df = df[~df[field].isin(values)] df['wind_turbine_name'] = df['wind_turbine_number'] df['wind_turbine_number'] = df['wind_turbine_number'].map( self.wind_col_trans).fillna(df['wind_turbine_number']) if 'end_time' in df.columns: df['time_diff'] = (df['end_time'] - df['begin_time']).dt.total_seconds() df.loc[df['time_diff'] < 0, 'time_diff'] = np.nan if self.save_zip: save_path = path.join(self.pathsAndTable.get_tmp_formal_path(), str(self.pathsAndTable.read_type) + '.csv.gz') else: save_path = path.join(self.pathsAndTable.get_tmp_formal_path(), str(self.pathsAndTable.read_type) + '.csv') create_file_path(save_path, is_file_path=True) df.to_csv(save_path, index=False, encoding='utf-8') # 归档文件 # def archive_file(self): # trans_print("无需归档文件") # 合并到正式文件 def combine_and_save_formal_file(self): df = read_file_to_df( os.path.join(self.pathsAndTable.get_tmp_formal_path(), str(self.pathsAndTable.read_type) + '.csv')) self.engine_count = len(df['wind_turbine_number'].unique()) self.min_date = df['begin_time'].min() self.max_date = df['begin_time'].max() self.data_count = df.shape[0] df = df[df['wind_turbine_number'].isin(self.wind_col_trans.values())] save_path = os.path.join(self.pathsAndTable.get_save_path(), str(self.pathsAndTable.read_type) + '.csv') exists_df = pd.DataFrame() if os.path.exists(save_path): exists_df = read_file_to_df(save_path) else: create_file_path(save_path, is_file_path=True) df = pd.concat([exists_df, df], ignore_index=True) df.drop_duplicates(inplace=True, keep='last') self.update_files = [save_path] # 根据开始时间进行排序 df.sort_values(by=['wind_turbine_number', 'begin_time'], inplace=True) if self.save_zip: df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S') else: df.to_csv(save_path, index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S') def save_to_db(self): table_name = self.pathsAndTable.get_table_name() drop_table(table_name) create_warn_fault_table(table_name) save_file_to_db(table_name, self.update_files[0], self.batch_count) def update_exec_progress(self): update_trans_status_success(self.id, self.engine_count, None, self.min_date, self.max_date, self.data_count, self.save_db)