123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- import os.path
- from os import *
- import numpy as np
- import pandas as pd
- from etl.common.BaseDataTrans import BaseDataTrans
- from service.trans_conf_service import update_trans_status_error, update_trans_status_success
- from service.trans_service import get_fault_warn_conf, drop_table, create_warn_fault_table, \
- save_file_to_db
- from utils.conf.read_conf import read_conf
- from utils.file.trans_methods import read_excel_files, read_file_to_df, create_file_path, valid_eval
- from utils.log.trans_log import trans_print
- class FaultWarnTrans(BaseDataTrans):
- def __init__(self, data: dict = None, save_db=True, yaml_config=None, step=0, end=6):
- super(FaultWarnTrans, self).__init__(data, save_db, yaml_config, step, end)
- self.engine_count = 0
- self.min_date = None
- self.max_date = None
- self.data_count = 0
- def get_filed_conf(self):
- return get_fault_warn_conf(self.wind_farm_code, self.transfer_type)
- # 第三步 读取 并 保存到临时文件
- def read_and_save_tmp_file(self):
- trans_print("无需保存临时文件")
- # 读取并保存到临时正式文件
- def statistics_and_save_tmp_formal_file(self):
- conf_map = self.get_filed_conf()
- if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
- message = f"未找到{self.id}的{self.transfer_type}配置"
- trans_print(message)
- update_trans_status_error(self.id, message, self.save_db)
- else:
- for key, v in conf_map.items():
- if v and type(v) == str:
- v = v.replace("\r\n", "").replace("\n", "")
- conf_map[key] = v
- read_fields_keys = [i for i in conf_map.keys() if i.startswith('field_')]
- # 需要执行 exec的字段
- # exec_fields = [(k.replace("exec_", ""), v) for k, v in conf_map.items() if k.startswith('exec_')]
- # 读取需要执行 筛选的字段
- select_fields = [(k.replace("select_", ""), v) for k, v in conf_map.items() if
- k.startswith('select_') and v]
- time_format = read_conf(conf_map, 'time_format')
- trans_map = dict()
- trans_cols = []
- for key in read_fields_keys:
- field_value = read_conf(conf_map, key)
- if field_value:
- vas = str(field_value).split('|')
- trans_cols.extend(vas)
- field_key = key.replace("field_", "")
- for v in vas:
- trans_map[v] = field_key
- all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
- df = pd.DataFrame()
- for file in all_files:
- now_df = read_file_to_df(file, trans_cols=trans_cols)
- df = pd.concat([df, now_df], ignore_index=True)
- df.rename(columns=trans_map, inplace=True)
- for col in df.columns:
- if 'field_' + col not in read_fields_keys:
- del df[col]
- if time_format:
- if valid_eval(time_format):
- eval_str = f"df['begin_time'].apply(lambda error_time: {time_format} )"
- df['begin_time'] = eval(eval_str)
- if 'end_time' in df.columns:
- eval_str = f"df['end_time'].apply(lambda error_time: {time_format} )"
- df['end_time'] = eval(eval_str)
- df['begin_time'] = pd.to_datetime(df['begin_time'], errors='coerce')
- if 'end_time' in df.columns:
- df['end_time'] = pd.to_datetime(df['end_time'], errors='coerce')
- exec_wind_turbine_number = read_conf(conf_map, 'exec_wind_turbine_number')
- if exec_wind_turbine_number:
- if valid_eval(exec_wind_turbine_number):
- exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {exec_wind_turbine_number} )"
- df['wind_turbine_number'] = eval(exec_str)
- for field, select_str in select_fields:
- use_in = True
- if str(select_str).strip().startswith("!"):
- use_in = False
- select_str = select_str[1:]
- select_str = select_str.replace("'", "").replace("[", "").replace("]", "")
- values = select_str.split(',')
- if df[field].dtype == int:
- values = [int(i) for i in values]
- if use_in:
- df = df[df[field].isin(values)]
- else:
- df = df[~df[field].isin(values)]
- df['wind_turbine_name'] = df['wind_turbine_number']
- df['wind_turbine_number'] = df['wind_turbine_number'].map(
- self.wind_col_trans).fillna(df['wind_turbine_number'])
- if 'end_time' in df.columns:
- df['time_diff'] = (df['end_time'] - df['begin_time']).dt.total_seconds()
- df.loc[df['time_diff'] < 0, 'time_diff'] = np.nan
- if self.save_zip:
- save_path = path.join(self.pathsAndTable.get_tmp_formal_path(),
- str(self.pathsAndTable.read_type) + '.csv.gz')
- else:
- save_path = path.join(self.pathsAndTable.get_tmp_formal_path(),
- str(self.pathsAndTable.read_type) + '.csv')
- create_file_path(save_path, is_file_path=True)
- df.to_csv(save_path, index=False, encoding='utf-8')
- # 归档文件
- # def archive_file(self):
- # trans_print("无需归档文件")
- # 合并到正式文件
- def combine_and_save_formal_file(self):
- df = read_file_to_df(
- os.path.join(self.pathsAndTable.get_tmp_formal_path(), str(self.pathsAndTable.read_type) + '.csv'))
- self.engine_count = len(df['wind_turbine_number'].unique())
- self.min_date = df['begin_time'].min()
- self.max_date = df['begin_time'].max()
- self.data_count = df.shape[0]
- df = df[df['wind_turbine_number'].isin(self.wind_col_trans.values())]
- save_path = os.path.join(self.pathsAndTable.get_save_path(), str(self.pathsAndTable.read_type) + '.csv')
- exists_df = pd.DataFrame()
- if os.path.exists(save_path):
- exists_df = read_file_to_df(save_path)
- else:
- create_file_path(save_path, is_file_path=True)
- df = pd.concat([exists_df, df], ignore_index=True)
- df.drop_duplicates(inplace=True, keep='last')
- self.update_files = [save_path]
- # 根据开始时间进行排序
- df.sort_values(by=['wind_turbine_number', 'begin_time'], inplace=True)
- if self.save_zip:
- df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')
- else:
- df.to_csv(save_path, index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')
- def save_to_db(self):
- table_name = self.pathsAndTable.get_table_name()
- drop_table(table_name)
- create_warn_fault_table(table_name)
- save_file_to_db(table_name, self.update_files[0], self.batch_count)
- def update_exec_progress(self):
- update_trans_status_success(self.id,
- self.engine_count, None, self.min_date, self.max_date, self.data_count,
- self.save_db)
|