123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- import os
- import numpy as np
- import pandas as pd
- from etl.common.BaseDataTrans import BaseDataTrans
- from service.plt_service import update_trans_status_error
- from service.trans_service import get_fault_warn_conf
- from utils.conf.read_conf import read_conf
- from utils.file.trans_methods import read_excel_files, read_file_to_df, create_file_path, valid_eval
- from utils.log.trans_log import trans_print
- class FaultWarnTrans(BaseDataTrans):
- def __init__(self, data: dict = None, save_db=True, step=0, end=4):
- super(FaultWarnTrans, self).__init__(data, save_db, step, end)
- def get_filed_conf(self):
- return get_fault_warn_conf(self.field_code, self.read_type)
- # 第三步 读取 并 保存到临时文件
- def read_and_save_tmp_file(self):
- trans_print("无需保存临时文件")
- # 第四步 统计 并 保存到正式文件
- def statistics_and_save_to_file(self):
- conf_map = self.get_filed_conf()
- if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
- message = f"未找到{self.batch_no}的{self.read_type}配置"
- trans_print(message)
- update_trans_status_error(self.batch_no, self.read_type, message, self.save_db)
- else:
- for key, v in conf_map.items():
- if v and type(v) == str:
- v = v.replace("\r\n", "").replace("\n", "")
- conf_map[key] = v
- read_fields_keys = [i for i in conf_map.keys() if i.startswith('field_')]
- # 需要执行 exec的字段
- # exec_fields = [(k.replace("exec_", ""), v) for k, v in conf_map.items() if k.startswith('exec_')]
- # 读取需要执行 筛选的字段
- select_fields = [(k.replace("select_", ""), v) for k, v in conf_map.items() if
- k.startswith('select_') and v]
- time_format = read_conf(conf_map, 'time_format')
- trans_map = dict()
- trans_cols = []
- for key in read_fields_keys:
- field_value = read_conf(conf_map, key)
- if field_value:
- vas = str(field_value).split('|')
- trans_cols.extend(vas)
- field_key = key.replace("field_", "")
- for v in vas:
- trans_map[v] = field_key
- all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
- df = pd.DataFrame()
- for file in all_files:
- now_df = read_file_to_df(file, trans_cols=trans_cols)
- df = pd.concat([df, now_df], ignore_index=True)
- df.rename(columns=trans_map, inplace=True)
- for col in df.columns:
- if 'field_' + col not in read_fields_keys:
- del df[col]
- if time_format:
- if valid_eval(time_format):
- eval_str = f"df['begin_time'].apply(lambda error_time: {time_format} )"
- df['begin_time'] = eval(eval_str)
- if 'end_time' in df.columns:
- eval_str = f"df['end_time'].apply(lambda error_time: {time_format} )"
- df['end_time'] = eval(eval_str)
- df['begin_time'] = pd.to_datetime(df['begin_time'], errors='coerce')
- if 'end_time' in df.columns:
- df['end_time'] = pd.to_datetime(df['end_time'], errors='coerce')
- exec_wind_turbine_number = read_conf(conf_map, 'exec_wind_turbine_number')
- if exec_wind_turbine_number:
- if valid_eval(exec_wind_turbine_number):
- exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {exec_wind_turbine_number} )"
- df['wind_turbine_number'] = eval(exec_str)
- for field, select_str in select_fields:
- use_in = True
- if str(select_str).strip().startswith("!"):
- use_in = False
- select_str = select_str[1:]
- select_str = select_str.replace("'", "").replace("[", "").replace("]", "")
- values = select_str.split(',')
- if df[field].dtype == int:
- values = [int(i) for i in values]
- if use_in:
- df = df[df[field].isin(values)]
- else:
- df = df[~df[field].isin(values)]
- df['wind_turbine_name'] = df['wind_turbine_number']
- df['wind_turbine_number'] = df['wind_turbine_number'].map(
- self.wind_col_trans).fillna(df['wind_turbine_number'])
- if 'end_time' in df.columns:
- df['time_diff'] = (df['end_time'] - df['begin_time']).dt.total_seconds()
- df.loc[df['time_diff'] < 0, 'time_diff'] = np.nan
- # 根绝开始时间进行排序
- df.sort_values(by=['wind_turbine_number', 'begin_time'], inplace=True)
- if self.save_zip:
- save_path = os.path.join(self.pathsAndTable.get_save_path(), str(self.batch_name) + '.csv.gz')
- else:
- save_path = os.path.join(self.pathsAndTable.get_save_path(), str(self.batch_name) + '.csv')
- create_file_path(save_path, is_file_path=True)
- if self.save_zip:
- df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')
- else:
- df.to_csv(save_path, index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')
|