Ver Fonte

开发故障报警

wzl há 9 meses atrás
pai
commit
9bf4fda1cd

+ 116 - 0
etl/common/BaseDataTrans.py

@@ -0,0 +1,116 @@
+import datetime
+import sys
+import traceback
+
+from etl.common.PathsAndTable import PathsAndTable
+from etl.common.ClearData import ClearData
+from etl.common.SaveToDb import SaveToDb
+from etl.common.UnzipAndRemove import UnzipAndRemove
+from service.plt_service import get_all_wind, get_batch_exec_data, get_data_by_batch_no_and_type, \
+    update_trans_status_success, update_trans_status_error, update_trans_status_running
+from tmp_file.power_derating_for_chunlin import read_excel_files
+from utils.log.trans_log import trans_print, set_trance_id
+
+
+class BaseDataTrans(object):
+    def __init__(self, data: dict = None, save_db=True, step=0, end=4):
+
+        self.batch_no = data['batch_code']
+        self.batch_name = data['batch_name']
+        self.read_type = data['transfer_type']
+        self.read_path = data['transfer_addr']
+        self.field_code = data['field_code']
+        self.field_name = data['field_name']
+        self.save_zip = False
+        self.step = step
+        self.end = end
+        self.wind_col_trans, self.rated_power_and_cutout_speed_map = get_all_wind(self.field_code)
+        self.batch_count = 100000
+        self.save_db = save_db
+        self.filed_conf = self.get_filed_conf()
+        self.pathsAndTable = PathsAndTable(self.batch_no, self.batch_name, self.read_path, self.field_name,
+                                           self.read_type, save_db, self.save_zip)
+
+    def get_filed_conf(self):
+        raise NotImplementedError("需要实现 获取点检表 方法")
+
+    # 第一步 清理数据
+    def clean_file_and_db(self):
+        clean_data = ClearData(self.pathsAndTable)
+        clean_data.run()
+
+    # 第二步 解压 移动到临时文件
+    def unzip_or_remove_to_tmp_dir(self):
+        # 解压并删除
+        unzip_and_remove = UnzipAndRemove(self.pathsAndTable)
+        unzip_and_remove.run()
+
+    # 第三步 读取 并 保存到临时文件
+    def read_and_save_tmp_file(self):
+        raise NotImplementedError("第三步未做实现")
+
+    # 第四步 统计 并 保存到正式文件
+    def statistics_and_save_to_file(self):
+        raise NotImplementedError("第四步未做实现")
+
+    # 第五步 保存到数据库
+    def save_to_db(self):
+        save_to_db = SaveToDb(self.pathsAndTable, self.batch_count)
+        save_to_db.run()
+
+    # 最后更新执行程度
+    def update_exec_progress(self):
+        update_trans_status_success(self.batch_no, self.read_type,
+                                    len(read_excel_files(self.pathsAndTable.get_save_path())),
+                                    None, None, None, None, self.save_db)
+
+    def run(self):
+        total_begin = datetime.datetime.now()
+        try:
+            trance_id = '-'.join([self.batch_no, self.field_name, self.read_type])
+            set_trance_id(trance_id)
+            update_trans_status_running(self.batch_no, self.read_type, self.save_db)
+
+            if self.step <= 0 and self.end >= 0:
+                begin = datetime.datetime.now()
+                trans_print("开始清理数据,临时文件夹:", self.pathsAndTable.get_tmp_path())
+                self.clean_file_and_db()
+                trans_print("清理数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
+
+            if self.step <= 1 and self.end >= 1:
+                begin = datetime.datetime.now()
+                trans_print("开始解压移动文件")
+                self.unzip_or_remove_to_tmp_dir()
+                trans_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
+
+            if self.step <= 2 and self.end >= 2:
+                begin = datetime.datetime.now()
+                trans_print("开始保存数据到临时文件")
+                self.read_and_save_tmp_file()
+                trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
+
+            if self.step <= 3 and self.end >= 3:
+                begin = datetime.datetime.now()
+                trans_print("开始保存数据到正式文件")
+                self.statistics_and_save_to_file()
+                trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
+
+            if self.step <= 4 and self.end >= 4:
+                begin = datetime.datetime.now()
+                trans_print("开始保存到数据库,是否存库:", self.pathsAndTable.save_db)
+                self.save_to_db()
+                trans_print("保存到数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
+
+            self.update_exec_progress()
+        except Exception as e:
+            trans_print(traceback.format_exc())
+            update_trans_status_error(self.batch_no, self.read_type, str(e), self.save_db)
+        finally:
+            self.pathsAndTable.delete_tmp_files()
+            trans_print("执行结束,总耗时:", str(datetime.datetime.now() - total_begin))
+
+
+if __name__ == '__main__':
+    test = BaseDataTrans(save_db=False, batch_no="WOF053600062-WOB000010", read_type="fault")
+
+    test.run()

+ 0 - 0
etl/common/__init__.py


+ 0 - 0
etl/wind_power/__init__.py


+ 113 - 0
etl/wind_power/fault_warn/FaultWarnTrans.py

@@ -0,0 +1,113 @@
+import os
+
+import numpy as np
+import pandas as pd
+
+from etl.common.BaseDataTrans import BaseDataTrans
+from service.plt_service import update_trans_status_error
+from service.trans_service import get_fault_warn_conf
+from utils.conf.read_conf import read_conf
+from utils.file.trans_methods import read_excel_files, read_file_to_df, create_file_path
+from utils.log.trans_log import trans_print
+
+
+class FaultWarnTrans(BaseDataTrans):
+
+    def __init__(self, data: dict = None, save_db=True, step=0, end=4):
+        super(FaultWarnTrans, self).__init__(data, save_db, step, end)
+
+    def get_filed_conf(self):
+        return get_fault_warn_conf(self.field_code, self.read_type)
+
+    # 第三步 读取 并 保存到临时文件
+    def read_and_save_tmp_file(self):
+        trans_print("无需保存临时文件")
+
+    # 第四步 统计 并 保存到正式文件
+    def statistics_and_save_to_file(self):
+        conf_map = self.get_filed_conf()
+        if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
+            message = f"未找到{self.batch_no}的{self.read_type}配置"
+            trans_print(message)
+            update_trans_status_error(self.batch_no, self.read_type, message, self.save_db)
+        else:
+
+            for key, v in conf_map.items():
+                if v and type(v) == str:
+                    v = v.replace("\r\n", "").replace("\n", "")
+                    conf_map[key] = v
+
+            read_fields_keys = [i for i in conf_map.keys() if i.startswith('field_')]
+            # 需要执行 exec的字段
+            # exec_fields = [(k.replace("exec_", ""), v) for k, v in conf_map.items() if k.startswith('exec_')]
+            # 读取需要执行 筛选的字段
+            select_fields = [(k.replace("select_", ""), v) for k, v in conf_map.items() if
+                             k.startswith('select_') and v]
+            time_format = read_conf(conf_map, 'time_format')
+
+            trans_map = dict()
+            trans_cols = []
+            for key in read_fields_keys:
+                field_value = read_conf(conf_map, key)
+                if field_value:
+                    vas = str(field_value).split('|')
+                    trans_cols.extend(vas)
+                    field_key = key.replace("field_", "")
+                    for v in vas:
+                        trans_map[v] = field_key
+
+            all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
+
+            df = pd.DataFrame()
+            for file in all_files:
+                now_df = read_file_to_df(file, trans_cols=trans_cols)
+                df = pd.concat([df, now_df], ignore_index=True)
+
+            df.rename(columns=trans_map, inplace=True)
+
+            if time_format:
+                df['begin_time'] = pd.to_datetime(df['begin_time'])
+                df['end_time'] = pd.to_datetime(df['end_time'])
+            else:
+                df['begin_time'] = pd.to_datetime(df['begin_time'], format=time_format)
+                df['end_time'] = pd.to_datetime(df['end_time'], format=time_format)
+
+            exec_wind_turbine_number = read_conf(conf_map, 'exec_wind_turbine_number')
+            if exec_wind_turbine_number:
+                exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {exec_wind_turbine_number} )"
+                df['wind_turbine_number'] = eval(exec_str)
+
+            for field, select_str in select_fields:
+                use_in = True
+                if str(select_str).strip().startswith("!"):
+                    use_in = False
+                    select_str = select_str[1:]
+
+                select_str = select_str.replace("'", "").replace("[", "").replace("]", "")
+                values = select_str.split(',')
+
+                if df[field].dtype == int:
+                    values = [int(i) for i in values]
+
+                if use_in:
+                    df = df[df[field].isin(values)]
+                else:
+                    df = df[~df[field].isin(values)]
+
+            df['wind_turbine_name'] = df['wind_turbine_number']
+            df['wind_turbine_number'] = df['wind_turbine_number'].map(
+                self.wind_col_trans).fillna(df['wind_turbine_number'])
+
+            df['time_diff'] = (df['end_time'] - df['begin_time']).dt.total_seconds()
+            df.loc[df['time_diff'] < 0, 'time_diff'] = np.nan
+
+            if self.save_zip:
+                save_path = os.path.join(self.pathsAndTable.get_save_path(), str(self.batch_name) + '.csv.gz')
+            else:
+                save_path = os.path.join(self.pathsAndTable.get_save_path(), str(self.batch_name) + '.csv')
+
+            create_file_path(save_path, is_file_path=True)
+            if self.save_zip:
+                df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')
+            else:
+                df.to_csv(save_path, index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')

+ 0 - 0
etl/wind_power/fault_warn/__init__.py


+ 0 - 0
etl/wind_power/min_sec/__init__.py