Pārlūkot izejas kodu

开发故障报警

wzl 9 mēneši atpakaļ
vecāks
revīzija
9faa6383c8

+ 39 - 112
app_run.py

@@ -3,113 +3,42 @@
 # @Author  : 魏志亮
 import os
 import sys
-import traceback
 
+from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
+from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
 
-def run_schedule(step=0, end=4, run_count=1):
-    # 更新超时任务
-    update_timeout_trans_data()
 
-    data = get_exec_data(run_count)
-    if data is None:
-        trans_print("当前有任务在执行")
-    elif len(data.keys()) == 0:
-        trans_print("当前无任务")
-    else:
-        batch_no = data['batch_code']
-        batch_name = data['batch_name']
-        transfer_type = data['transfer_type']
-        transfer_file_addr = data['transfer_addr']
-        field_code = data['field_code']
-        field_name = data['field_name']
-
-        __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr, field_name, field_code,
-                     save_db=True)
-
-
-def run_local(step=0, end=3, batch_no=None, batch_name='', transfer_type=None, transfer_file_addr=None, field_name=None,
-              field_code="测试", save_db=False):
-    if batch_no is None or str(batch_no).strip() == '':
-        return "批次编号不能为空"
-
-    if transfer_type not in ['second', 'minute', 'second_1']:
-        return "查询类型错误"
-
-    if transfer_file_addr is None or str(transfer_file_addr).strip() == '':
-        return "文件路径不能为空"
-
-    __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr, field_name, field_code,
-                 save_db=save_db)
-
-
-def __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr=None, field_name=None,
-                 field_code="测试",
-                 save_db=False):
-    trance_id = '-'.join([batch_no, field_name, transfer_type])
-    set_trance_id(trance_id)
-    conf_map = get_trans_conf(field_code, field_name, transfer_type)
-    if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
-        message = f"未找到{field_name}的{transfer_type}配置"
-        trans_print(message)
-        update_trans_status_error(batch_no, transfer_type, message, save_db)
+def get_exec_data(batch_no=None, read_type=None, run_count=1):
+    if batch_no and read_type:
+        data = get_data_by_batch_no_and_type(batch_no, read_type)
+        if data is None:
+            raise ValueError(f"未找到批次号:{batch_no},类型:{read_type}")
+
     else:
+        data = get_batch_exec_data(run_count)
+        if data is None:
+            trans_print("当前有任务在执行")
+            sys.exit(0)
+        elif len(data.keys()) == 0:
+            trans_print("当前无任务")
+            sys.exit(0)
+
+    return data
+
 
-        resolve_col_prefix = read_conf(conf_map, 'resolve_col_prefix')
-        wind_name_exec = read_conf(conf_map, 'wind_name_exec', None)
-        is_vertical_table = read_conf(conf_map, 'is_vertical_table', False)
-        merge_columns = read_conf(conf_map, 'merge_columns', False)
-
-        vertical_cols = read_conf(conf_map, 'vertical_read_cols', '').split(',')
-        index_cols = read_conf(conf_map, 'vertical_index_cols', '').split(',')
-        vertical_key = read_conf(conf_map, 'vertical_col_key')
-        vertical_value = read_conf(conf_map, 'vertical_col_value')
-        need_valid_cols = not merge_columns
-
-        begin_header = read_conf(conf_map, 'begin_header', 0)
-
-        cols_trans_all = dict()
-        trans_cols = ['wind_turbine_number', 'time_stamp', 'active_power', 'rotor_speed', 'generator_speed',
-                      'wind_velocity', 'pitch_angle_blade_1', 'pitch_angle_blade_2', 'pitch_angle_blade_3',
-                      'cabin_position', 'true_wind_direction', 'yaw_error1', 'set_value_of_active_power',
-                      'gearbox_oil_temperature', 'generatordrive_end_bearing_temperature',
-                      'generatornon_drive_end_bearing_temperature', 'wind_turbine_status',
-                      'wind_turbine_status2',
-                      'cabin_temperature', 'twisted_cable_angle', 'front_back_vibration_of_the_cabin',
-                      'side_to_side_vibration_of_the_cabin', 'actual_torque', 'given_torque',
-                      'clockwise_yaw_count',
-                      'counterclockwise_yaw_count', 'unusable', 'power_curve_available',
-                      'required_gearbox_speed',
-                      'inverter_speed_master_control', 'outside_cabin_temperature', 'main_bearing_temperature',
-                      'gearbox_high_speed_shaft_bearing_temperature',
-                      'gearboxmedium_speed_shaftbearing_temperature',
-                      'gearbox_low_speed_shaft_bearing_temperature', 'generator_winding1_temperature',
-                      'generator_winding2_temperature', 'generator_winding3_temperature',
-                      'turbulence_intensity', 'param1',
-                      'param2', 'param3', 'param4', 'param5', 'param6', 'param7', 'param8', 'param9', 'param10']
-
-        for col in trans_cols:
-            cols_trans_all[col] = read_conf(conf_map, col, '')
-
-        params = TransParam(read_type=transfer_type, read_path=transfer_file_addr,
-                            cols_tran=cols_trans_all,
-                            wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
-                            vertical_cols=vertical_cols, vertical_key=vertical_key,
-                            vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
-                            resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols, header=begin_header)
-
-        try:
-            trans_subject = WindFarms(batch_no=batch_no, batch_name=batch_name, field_code=field_code,
-                                      field_name=field_name,
-                                      save_db=save_db,
-                                      header=begin_header, trans_param=params)
-            trans_subject.run(step=step, end=end)
-        except Exception as e:
-            trans_print(traceback.format_exc())
-            message = "系统返回错误:" + str(e)
-            update_trans_status_error(batch_no, transfer_type, message, save_db)
-        finally:
-            set_trance_id("")
-            trans_subject.pathsAndTable.delete_tmp_files()
+def run(batch_no=None, read_type=None, save_db=True, run_count=1):
+    data = get_exec_data(batch_no, read_type, run_count)
+
+    exec_process = None
+    if data['transfer_type'] in ['second', 'minute']:
+        exec_process = MinSecTrans(data=data, save_db=save_db)
+
+    if data['transfer_type'] in ['fault', 'warn']:
+        exec_process = FaultWarnTrans(data=data, save_db=save_db)
+
+    if exec_process is None:
+        raise Exception("No exec process")
+    exec_process.run()
 
 
 if __name__ == '__main__':
@@ -128,15 +57,13 @@ if __name__ == '__main__':
     if len(sys.argv) >= 3:
         run_count = int(sys.argv[2])
 
-    from utils.log.trans_log import trans_print, set_trance_id
-    from etl.base.TransParam import TransParam
-    from etl.base.WindFarms import WindFarms
-    from service.plt_service import get_exec_data, update_trans_status_error, update_timeout_trans_data
-    from service.trans_service import get_trans_conf
-    from utils.conf.read_conf import read_conf
+    conf_path = '/data/config/etl_config.yaml'
+    if len(sys.argv) >= 4:
+        conf_path = sys.argv[3]
+
+    os.environ['ETL_CONF'] = conf_path
 
-    run_schedule(run_count=run_count)
+    from utils.log.trans_log import trans_print
+    from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type
 
-    # run_local(4, 4, batch_no='WOF035200003-WOB000005', batch_name='MM14号机组0719', transfer_type='second',
-    #            transfer_file_addr=r'/data/download/collection_data/1进行中/密马风电场-山西-大唐/收资数据/scada/14号/sec', field_name='密马风电场',
-    #            field_code="WOF035200003", save_db=True)
+    run(run_count=run_count)

+ 4 - 1
conf/etl_config.yaml

@@ -42,4 +42,7 @@ trans_prod:
 # 如果要放在原始路径,则配置这个 以下面的名称作为切割点,新建清理数据文件夹
 etl_origin_path_contain: 收资数据
 # 如果单独保存,配置这个路径
-save_path:
+save_path:
+
+# 日志保存路径
+log_path_dir: /home/wzl/logs

+ 0 - 98
etl/base/WindFarms.py

@@ -1,98 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time    : 2024/5/15
-# @Author  : 魏志亮
-import datetime
-import multiprocessing
-
-from etl.base.PathsAndTable import PathsAndTable
-from etl.base.TransParam import TransParam
-from etl.step.ClearData import ClearData
-from etl.step.ReadAndSaveTmp import ReadAndSaveTmp
-from etl.step.SaveToDb import SaveToDb
-from etl.step.StatisticsAndSaveFile import StatisticsAndSaveFile
-from etl.step.UnzipAndRemove import UnzipAndRemove
-from service.plt_service import get_all_wind, update_trans_status_running, \
-    update_trans_status_success, update_trans_transfer_progress
-from service.trans_service import batch_statistics
-from utils.df_utils.util import get_time_space
-from utils.file.trans_methods import *
-
-
-class WindFarms(object):
-
-    def __init__(self, batch_no=None, batch_name=None, field_code=None, field_name=None, params: TransParam = None,
-                 save_db=True, header=0, trans_param: TransParam = None):
-        self.batch_no = batch_no
-        self.batch_name = batch_name
-        self.field_code = field_code
-        self.field_name = field_name
-        self.save_zip = False
-        self.trans_param = params
-        self.wind_col_trans, self.rated_power_and_cutout_speed_map = get_all_wind(self.field_code)
-        self.batch_count = 50000
-        self.save_path = None
-        self.save_db = save_db
-        self.statistics_map = multiprocessing.Manager().dict()
-        self.header = header
-        self.trans_param = trans_param
-        self.trans_param.wind_col_trans = self.wind_col_trans
-        self.pathsAndTable = PathsAndTable(batch_no, batch_name, self.trans_param.read_path, self.field_name,
-                                           self.trans_param.read_type, save_db, save_zip=self.save_zip)
-
-    def run(self, step=0, end=4):
-        begin = datetime.datetime.now()
-        trans_print("开始执行")
-        update_trans_status_running(self.batch_no, self.trans_param.read_type, self.save_db)
-        if step <= 0 and end >= 0:
-            clean_data = ClearData(self.pathsAndTable)
-            clean_data.run()
-
-        if step <= 1 and end >= 1:
-            # 更新运行状态到运行中
-            unzip_and_remove = UnzipAndRemove(self.pathsAndTable)
-            unzip_and_remove.run()
-
-        if step <= 2 and end >= 2:
-            read_and_save_tmp = ReadAndSaveTmp(self.pathsAndTable, self.trans_param)
-            read_and_save_tmp.run()
-
-        if step <= 3 and end >= 3:
-            # 保存到正式文件
-            statistics_and_save_file = StatisticsAndSaveFile(self.pathsAndTable, self.trans_param, self.statistics_map,
-                                                             self.rated_power_and_cutout_speed_map)
-            statistics_and_save_file.run()
-
-        if step <= 4 and end >= 4:
-            if self.save_db:
-                save_to_db = SaveToDb(self.pathsAndTable)
-                save_to_db.run()
-
-        update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 99, self.save_db)
-        # 如果end==0 则说明只是进行了验证
-        if end >= 4:
-            all_files = read_excel_files(self.pathsAndTable.get_save_path())
-            if step <= 3:
-                update_trans_status_success(self.batch_no, self.trans_param.read_type,
-                                            len(all_files),
-                                            self.statistics_map['time_granularity'],
-                                            self.statistics_map['min_date'], self.statistics_map['max_date'],
-                                            self.statistics_map['total_count'], self.save_db)
-            else:
-                df = read_file_to_df(all_files[0], read_cols=['time_stamp'])
-                df['time_stamp'] = pd.to_datetime(df['time_stamp'])
-                time_granularity = get_time_space(df, 'time_stamp')
-                batch_data = batch_statistics("_".join([self.batch_no, self.trans_param.read_type]))
-                if batch_data is not None:
-                    update_trans_status_success(self.batch_no, self.trans_param.read_type,
-                                                len(read_excel_files(self.pathsAndTable.get_save_path())),
-                                                time_granularity,
-                                                batch_data['min_date'], batch_data['max_date'],
-                                                batch_data['total_count'], self.save_db)
-                else:
-                    update_trans_status_success(self.batch_no, self.trans_param.read_type,
-                                                len(read_excel_files(self.pathsAndTable.get_save_path())),
-                                                time_granularity,
-                                                None, None,
-                                                None, self.save_db)
-        trans_print("结束执行", self.trans_param.read_type, ",总耗时:",
-                    str(datetime.datetime.now() - begin))

+ 0 - 0
etl/base/__init__.py


+ 4 - 3
etl/step/ClearData.py → etl/common/ClearData.py

@@ -1,6 +1,6 @@
 import datetime
 
-from etl.base.PathsAndTable import PathsAndTable
+from etl.common.PathsAndTable import PathsAndTable
 from service.plt_service import update_trans_transfer_progress
 from utils.log.trans_log import trans_print
 
@@ -12,11 +12,12 @@ class ClearData(object):
 
     def clean_data(self):
         self.pathsAndTable.delete_tmp_files()
-        self.pathsAndTable.delete_batch_db()
+        if self.pathsAndTable.save_db:
+            self.pathsAndTable.delete_batch_db()
         self.pathsAndTable.delete_batch_files()
 
     def run(self):
-        trans_print("开始清理数据")
+        trans_print("开始清理数据,临时文件夹:", self.pathsAndTable.get_tmp_path())
         begin = datetime.datetime.now()
         self.clean_data()
         update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 5,

+ 15 - 10
etl/base/PathsAndTable.py → etl/common/PathsAndTable.py

@@ -4,7 +4,7 @@ import tempfile
 
 import yaml
 
-from service.trans_service import drop_table, creat_table_and_add_partition
+from service.trans_service import drop_table, creat_min_sec_table, create_warn_fault_table
 from utils.log.trans_log import trans_print
 from utils.conf.read_conf import *
 
@@ -38,21 +38,21 @@ class PathsAndTable(object):
     def get_save_path(self):
         return os.path.join(self.save_path, self.batch_no + "_" + self.batch_name, self.read_type)
 
-    def get_save_tmp_path(self):
+    def get_tmp_path(self):
         return os.path.join(tempfile.gettempdir(), self.field_name, self.batch_no + "_" + self.batch_name,
                             self.read_type)
 
     def get_excel_tmp_path(self):
-        return os.path.join(self.get_save_tmp_path(), 'excel_tmp' + os.sep)
+        return os.path.join(self.get_tmp_path(), 'excel_tmp' + os.sep)
 
     def get_read_tmp_path(self):
-        return os.path.join(self.get_save_tmp_path(), 'read_tmp')
+        return os.path.join(self.get_tmp_path(), 'read_tmp')
 
     def get_merge_tmp_path(self, wind_turbine_number=None):
         if wind_turbine_number is None:
-            return os.path.join(self.get_save_tmp_path(), 'merge_tmp')
+            return os.path.join(self.get_tmp_path(), 'merge_tmp')
         else:
-            return os.path.join(self.get_save_tmp_path(), 'merge_tmp', str(wind_turbine_number))
+            return os.path.join(self.get_tmp_path(), 'merge_tmp', str(wind_turbine_number))
 
     def get_table_name(self):
         return "_".join([self.batch_no, self.read_type])
@@ -65,8 +65,8 @@ class PathsAndTable(object):
 
     def delete_tmp_files(self):
         trans_print("开始删除临时文件夹")
-        if os.path.exists(self.get_save_tmp_path()):
-            shutil.rmtree(self.get_save_tmp_path())
+        if os.path.exists(self.get_tmp_path()):
+            shutil.rmtree(self.get_tmp_path())
         trans_print("删除临时文件夹删除成功")
 
     def delete_batch_db(self):
@@ -76,8 +76,13 @@ class PathsAndTable(object):
             drop_table(table_name, self.save_db)
             trans_print("删除表结束")
 
-    def create_batch_db(self, wind_names=list()):
+    def create_batch_db(self, wind_names: list = list()):
         if self.save_db:
             trans_print("开始创建表")
-            creat_table_and_add_partition(self.get_table_name(), wind_names, self.read_type)
+            if self.read_type in ['second', 'minute']:
+                creat_min_sec_table(self.get_table_name(), wind_names, self.read_type)
+            elif self.read_type in ['fault', 'warn']:
+                create_warn_fault_table(self.get_table_name())
+            else:
+                raise Exception("不支持的读取类型:" + self.read_type)
             trans_print("建表结束")

+ 10 - 16
etl/step/SaveToDb.py → etl/common/SaveToDb.py

@@ -1,9 +1,8 @@
-import datetime
 import multiprocessing
 import os
 import traceback
 
-from etl.base.PathsAndTable import PathsAndTable
+from etl.common.PathsAndTable import PathsAndTable
 from service.plt_service import update_trans_transfer_progress
 from service.trans_service import save_file_to_db
 from utils.file.trans_methods import read_excel_files, split_array
@@ -13,20 +12,17 @@ from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
 
 class SaveToDb(object):
 
-    def __init__(self, pathsAndTable: PathsAndTable):
+    def __init__(self, pathsAndTable: PathsAndTable, batch_count=100000):
         self.pathsAndTable = pathsAndTable
+        self.batch_count = batch_count
 
     def mutiprocessing_to_save_db(self):
         # 开始保存到SQL文件
 
         self.pathsAndTable.delete_batch_db()
-        trans_print("开始保存到数据库文件")
         all_saved_files = read_excel_files(self.pathsAndTable.get_save_path())
         wind_names = [str(os.path.basename(i)).replace(".csv", "") for i in all_saved_files]
 
-        # creat_table_and_add_partition(self.pathsAndTable.get_table_name(), wind_names,
-        #                               self.pathsAndTable.read_type)
-
         self.pathsAndTable.create_batch_db(wind_names)
 
         split_count = get_available_cpu_count_with_percent(percent=1 / 2)
@@ -35,8 +31,9 @@ class SaveToDb(object):
         try:
             for index, arr in enumerate(all_arrays):
                 with multiprocessing.Pool(split_count) as pool:
-                    pool.starmap(save_file_to_db, [(self.pathsAndTable.get_table_name(), file,) for file in
-                                                   all_saved_files])
+                    pool.starmap(save_file_to_db,
+                                 [(self.pathsAndTable.get_table_name(), file, self.batch_count) for file in
+                                  all_saved_files])
                 update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
                                                round(70 + 29 * (index + 1) / len(all_arrays), 2),
                                                self.pathsAndTable.save_db)
@@ -44,12 +41,9 @@ class SaveToDb(object):
             trans_print(traceback.format_exc())
             message = "保存到数据库错误,系统返回错误:" + str(e)
             raise ValueError(message)
-        trans_print("结束保存到数据库文件")
 
     def run(self):
-        trans_print("开始保存到数据库")
-        begin = datetime.datetime.now()
-        self.mutiprocessing_to_save_db()
-        update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 99,
-                                       self.pathsAndTable.save_db)
-        trans_print("保存到数据结束,耗时:", datetime.datetime.now() - begin)
+        if self.pathsAndTable.save_db:
+            self.mutiprocessing_to_save_db()
+            update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 99,
+                                           self.pathsAndTable.save_db)

+ 1 - 6
etl/step/UnzipAndRemove.py → etl/common/UnzipAndRemove.py

@@ -2,9 +2,7 @@ import multiprocessing
 import os
 import traceback
 
-import datetime
-
-from etl.base.PathsAndTable import PathsAndTable
+from etl.common.PathsAndTable import PathsAndTable
 from service.plt_service import update_trans_transfer_progress
 from utils.file.trans_methods import read_files, read_excel_files, copy_to_new, split_array
 from utils.log.trans_log import trans_print
@@ -68,9 +66,6 @@ class UnzipAndRemove(object):
         return all_files
 
     def run(self):
-        trans_print("开始解压移动文件")
-        begin = datetime.datetime.now()
         self.remove_file_to_tmp_path()
         update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 20,
                                        self.pathsAndTable.save_db)
-        trans_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin)

+ 0 - 0
etl/step/__init__.py


+ 0 - 2
etl/step/ClassIdentifier.py → etl/wind_power/min_sec/ClassIdentifier.py

@@ -349,9 +349,7 @@ class ClassIdentifier(object):
 
     def run(self):
         # Implement your class identification logic here
-        print_memory_usage(self.wind_turbine_number + "开始打标签")
         begin = datetime.datetime.now()
         df = self.identifier()
         trans_print("打标签结束,", df.shape, ",耗时:", datetime.datetime.now() - begin)
-        print_memory_usage(self.wind_turbine_number + "打标签结束,")
         return df

+ 119 - 0
etl/wind_power/min_sec/MinSecTrans.py

@@ -0,0 +1,119 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/5/15
+# @Author  : 魏志亮
+import multiprocessing
+
+from etl.common.BaseDataTrans import BaseDataTrans
+from etl.wind_power.min_sec.TransParam import TransParam
+from etl.wind_power.min_sec.ReadAndSaveTmp import ReadAndSaveTmp
+from etl.wind_power.min_sec.StatisticsAndSaveFile import StatisticsAndSaveFile
+from service.plt_service import update_trans_status_success, update_trans_status_error
+from service.trans_service import batch_statistics, get_min_sec_conf
+from utils.conf.read_conf import read_conf
+from utils.df_utils.util import get_time_space
+from utils.file.trans_methods import *
+
+
+class MinSecTrans(BaseDataTrans):
+
+    def __init__(self, data: dict = None, save_db=True, step=0, end=4):
+        super(MinSecTrans, self).__init__(data, save_db, step, end)
+        self.statistics_map = multiprocessing.Manager().dict()
+        self.trans_param = self.get_trans_param()
+        self.trans_param.wind_col_trans = self.wind_col_trans
+
+    def get_filed_conf(self):
+        return get_min_sec_conf(self.field_code, self.read_type)
+
+    def get_trans_param(self):
+        conf_map = self.get_filed_conf()
+        if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
+            message = f"未找到{self.batch_no}的{self.read_type}配置"
+            trans_print(message)
+            update_trans_status_error(self.batch_no, self.read_type, message, self.save_db)
+        else:
+            resolve_col_prefix = read_conf(conf_map, 'resolve_col_prefix')
+            wind_name_exec = read_conf(conf_map, 'wind_name_exec', None)
+            is_vertical_table = read_conf(conf_map, 'is_vertical_table', False)
+            merge_columns = read_conf(conf_map, 'merge_columns', False)
+
+            vertical_cols = read_conf(conf_map, 'vertical_read_cols', '').split(',')
+            index_cols = read_conf(conf_map, 'vertical_index_cols', '').split(',')
+            vertical_key = read_conf(conf_map, 'vertical_col_key')
+            vertical_value = read_conf(conf_map, 'vertical_col_value')
+            need_valid_cols = not merge_columns
+
+            cols_trans_all = dict()
+            trans_cols = ['wind_turbine_number', 'time_stamp', 'active_power', 'rotor_speed', 'generator_speed',
+                          'wind_velocity', 'pitch_angle_blade_1', 'pitch_angle_blade_2', 'pitch_angle_blade_3',
+                          'cabin_position', 'true_wind_direction', 'yaw_error1', 'set_value_of_active_power',
+                          'gearbox_oil_temperature', 'generatordrive_end_bearing_temperature',
+                          'generatornon_drive_end_bearing_temperature', 'wind_turbine_status',
+                          'wind_turbine_status2',
+                          'cabin_temperature', 'twisted_cable_angle', 'front_back_vibration_of_the_cabin',
+                          'side_to_side_vibration_of_the_cabin', 'actual_torque', 'given_torque',
+                          'clockwise_yaw_count',
+                          'counterclockwise_yaw_count', 'unusable', 'power_curve_available',
+                          'required_gearbox_speed',
+                          'inverter_speed_master_control', 'outside_cabin_temperature', 'main_bearing_temperature',
+                          'gearbox_high_speed_shaft_bearing_temperature',
+                          'gearboxmedium_speed_shaftbearing_temperature',
+                          'gearbox_low_speed_shaft_bearing_temperature', 'generator_winding1_temperature',
+                          'generator_winding2_temperature', 'generator_winding3_temperature',
+                          'turbulence_intensity', 'param1',
+                          'param2', 'param3', 'param4', 'param5', 'param6', 'param7', 'param8', 'param9', 'param10']
+
+            for col in trans_cols:
+                cols_trans_all[col] = read_conf(conf_map, col, '')
+
+            return TransParam(read_type=self.read_type, read_path=self.read_path,
+                              cols_tran=cols_trans_all,
+                              wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
+                              vertical_cols=vertical_cols, vertical_key=vertical_key,
+                              vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
+                              resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols)
+
+    # 第三步 读取 并 保存到临时文件
+    def read_and_save_tmp_file(self):
+        read_and_save_tmp = ReadAndSaveTmp(self.pathsAndTable, self.trans_param)
+        read_and_save_tmp.run()
+
+    # 第四步 统计 并 保存到正式文件
+    def statistics_and_save_to_file(self):
+        # 保存到正式文件
+        statistics_and_save_file = StatisticsAndSaveFile(self.pathsAndTable, self.trans_param, self.statistics_map,
+                                                         self.rated_power_and_cutout_speed_map)
+        statistics_and_save_file.run()
+
+    # 最后更新执行程度
+    def update_exec_progress(self):
+        if self.end >= 4:
+            all_files = read_excel_files(self.pathsAndTable.get_save_path())
+            if self.step <= 3:
+                update_trans_status_success(self.batch_no, self.trans_param.read_type,
+                                            len(all_files),
+                                            self.statistics_map['time_granularity'],
+                                            self.statistics_map['min_date'], self.statistics_map['max_date'],
+                                            self.statistics_map['total_count'], self.save_db)
+            else:
+                df = read_file_to_df(all_files[0], read_cols=['time_stamp'])
+                df['time_stamp'] = pd.to_datetime(df['time_stamp'])
+                time_granularity = get_time_space(df, 'time_stamp')
+                batch_data = batch_statistics("_".join([self.batch_no, self.trans_param.read_type]))
+                if batch_data is not None:
+                    update_trans_status_success(self.batch_no, self.trans_param.read_type,
+                                                len(read_excel_files(self.pathsAndTable.get_save_path())),
+                                                time_granularity,
+                                                batch_data['min_date'], batch_data['max_date'],
+                                                batch_data['total_count'], self.save_db)
+                else:
+                    update_trans_status_success(self.batch_no, self.trans_param.read_type,
+                                                len(read_excel_files(self.pathsAndTable.get_save_path())),
+                                                time_granularity,
+                                                None, None,
+                                                None, self.save_db)
+
+
+if __name__ == '__main__':
+    test = MinSecTrans(batch_no="WOF053600062-WOB000009", read_type="minute", save_db=False)
+    test.run()

+ 6 - 11
etl/step/ReadAndSaveTmp.py → etl/wind_power/min_sec/ReadAndSaveTmp.py

@@ -5,8 +5,8 @@ import traceback
 
 import pandas as pd
 
-from etl.base import TransParam
-from etl.base.PathsAndTable import PathsAndTable
+from etl.wind_power.min_sec import TransParam
+from etl.common.PathsAndTable import PathsAndTable
 from service.plt_service import update_trans_transfer_progress
 from utils.file.trans_methods import read_excel_files, split_array, del_blank, \
     create_file_path, read_file_to_df
@@ -91,7 +91,6 @@ class ReadAndSaveTmp(object):
                             else:
                                 same_col[value] = [k]
 
-                trans_print("包含转换字段,开始处理转换字段")
                 df.rename(columns=real_cols_trans, inplace=True)
 
                 # 添加使用同一个excel字段的值
@@ -206,8 +205,7 @@ class ReadAndSaveTmp(object):
 
         if self.trans_param.is_vertical_table:
             vertical_cols = self.trans_param.vertical_cols
-            df = read_file_to_df(file_path, vertical_cols, header=self.trans_param.header,
-                                 trans_cols=self.trans_param.vertical_cols)
+            df = read_file_to_df(file_path, vertical_cols, trans_cols=self.trans_param.vertical_cols)
             df = df[df[self.trans_param.vertical_key].isin(read_cols)]
             df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number',
                                self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True)
@@ -230,15 +228,12 @@ class ReadAndSaveTmp(object):
                     trans_cols.append(v)
             trans_cols = list(set(trans_cols))
             if self.trans_param.merge_columns:
-                df = read_file_to_df(file_path, header=self.trans_param.header,
-                                     trans_cols=trans_cols)
+                df = read_file_to_df(file_path, trans_cols=trans_cols)
             else:
                 if self.trans_param.need_valid_cols:
-                    df = read_file_to_df(file_path, read_cols, header=self.trans_param.header,
-                                         trans_cols=trans_cols)
+                    df = read_file_to_df(file_path, read_cols, trans_cols=trans_cols)
                 else:
-                    df = read_file_to_df(file_path, header=self.trans_param.header,
-                                         trans_cols=trans_cols)
+                    df = read_file_to_df(file_path, trans_cols=trans_cols)
 
             # 处理列名前缀问题
             if self.trans_param.resolve_col_prefix:

+ 4 - 47
etl/step/StatisticsAndSaveFile.py → etl/wind_power/min_sec/StatisticsAndSaveFile.py

@@ -6,15 +6,15 @@ import traceback
 import pandas as pd
 import numpy as np
 
-from etl.base import TransParam
-from etl.base.PathsAndTable import PathsAndTable
-from etl.step.ClassIdentifier import ClassIdentifier
+from etl.wind_power.min_sec import TransParam
+from etl.common.PathsAndTable import PathsAndTable
+from etl.wind_power.min_sec.ClassIdentifier import ClassIdentifier
 from service.plt_service import update_trans_transfer_progress
 from utils.conf.read_conf import read_conf
 from utils.df_utils.util import get_time_space
 from utils.file.trans_methods import create_file_path, read_excel_files, read_file_to_df, split_array
 from utils.log.trans_log import trans_print
-from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, print_memory_usage
+from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
 
 
 class StatisticsAndSaveFile(object):
@@ -55,21 +55,7 @@ class StatisticsAndSaveFile(object):
                 if 'time_granularity' not in self.statistics_map.keys():
                     self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp')
 
-    def save_statistics_file(self):
-        save_path = os.path.join(os.path.dirname(self.paths_and_table.get_save_path()),
-                                 self.paths_and_table.read_type + '_statistics.txt')
-        create_file_path(save_path, is_file_path=True)
-        with open(save_path, 'w', encoding='utf8') as f:
-            f.write("总数据量:" + str(self.statistics_map['total_count']) + "\n")
-            f.write("最小时间:" + str(self.statistics_map['min_date']) + "\n")
-            f.write("最大时间:" + str(self.statistics_map['max_date']) + "\n")
-            f.write("风机数量:" + str(len(read_excel_files(self.paths_and_table.get_read_tmp_path()))) + "\n")
-
-    def check_data_validity(self, df):
-        pass
-
     def save_to_csv(self, filename):
-        print_memory_usage("开始读取csv:" + os.path.basename(filename))
         df = read_file_to_df(filename)
         if self.trans_param.is_vertical_table:
             df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
@@ -78,23 +64,17 @@ class StatisticsAndSaveFile(object):
             # 重置索引以得到普通的列
             df.reset_index(inplace=True)
 
-        print_memory_usage("结束读取csv,:" + os.path.basename(filename))
-
         # 转化风机名称
-        trans_print("开始转化风机名称")
         origin_wind_name = str(df['wind_turbine_number'].values[0])
         df['wind_turbine_number'] = df['wind_turbine_number'].astype('str')
         # df['wind_turbine_name'] = df['wind_turbine_number']
         df['wind_turbine_number'] = df['wind_turbine_number'].map(
             self.trans_param.wind_col_trans).fillna(df['wind_turbine_number'])
         wind_col_name = str(df['wind_turbine_number'].values[0])
-        print_memory_usage("转化风机名称结束:" + wind_col_name)
 
         not_double_cols = ['wind_turbine_number', 'wind_turbine_name', 'time_stamp', 'param6', 'param7', 'param8',
                            'param9', 'param10']
 
-        solve_time_begin = datetime.datetime.now()
-        trans_print(wind_col_name, "去掉非法数据前大小:", df.shape[0])
         df.replace(np.nan, -999999999, inplace=True)
         number_cols = df.select_dtypes(include=['number']).columns.tolist()
         for col in df.columns:
@@ -103,49 +83,33 @@ class StatisticsAndSaveFile(object):
                     df[col] = pd.to_numeric(df[col], errors='coerce')
                     # 删除包含NaN的行(即那些列A转换失败的行)
                     df = df.dropna(subset=[col])
-        trans_print(wind_col_name, "去掉非法数据后大小:", df.shape[0])
         df.replace(-999999999, np.nan, inplace=True)
-        print_memory_usage("处理非法数据大小结束:" + wind_col_name)
 
-        trans_print(wind_col_name, "去掉重复数据前大小:", df.shape[0])
         df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
-        trans_print(wind_col_name, "去掉重复数据后大小:", df.shape[0])
-        trans_print("处理非法重复数据结束,耗时:", datetime.datetime.now() - solve_time_begin)
-        print_memory_usage("处理重复数据结束:" + wind_col_name)
 
         # 添加年月日
         solve_time_begin = datetime.datetime.now()
-        trans_print(wind_col_name, "包含时间字段,开始处理时间字段,添加年月日", filename)
-        trans_print(wind_col_name, "时间原始大小:", df.shape[0])
         # df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
         # trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
         df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
         df.dropna(subset=['time_stamp'], inplace=True)
-        trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
         df.sort_values(by='time_stamp', inplace=True)
-        trans_print("处理时间字段结束,耗时:", datetime.datetime.now() - solve_time_begin)
-        print_memory_usage("处理时间结果:" + wind_col_name)
-
         df = df[[i for i in self.trans_param.cols_tran.keys() if i in df.columns]]
-        print_memory_usage("删减无用字段后内存占用:" + wind_col_name)
 
         rated_power_and_cutout_speed_tuple = read_conf(self.rated_power_and_cutout_speed_map, str(wind_col_name))
         if rated_power_and_cutout_speed_tuple is None:
             rated_power_and_cutout_speed_tuple = (None, None)
 
-        print_memory_usage("打标签前内存占用:" + wind_col_name)
         class_identifiler = ClassIdentifier(wind_turbine_number=wind_col_name, origin_df=df,
                                             rated_power=rated_power_and_cutout_speed_tuple[0],
                                             cut_out_speed=rated_power_and_cutout_speed_tuple[1])
         df = class_identifiler.run()
-        print_memory_usage("打标签后内存占用:" + wind_col_name)
 
         df['year'] = df['time_stamp'].dt.year
         df['month'] = df['time_stamp'].dt.month
         df['day'] = df['time_stamp'].dt.day
         df['time_stamp'] = df['time_stamp'].apply(
             lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
-        print_memory_usage("添加年月日后:" + wind_col_name)
 
         df['wind_turbine_name'] = str(origin_wind_name)
 
@@ -165,9 +129,7 @@ class StatisticsAndSaveFile(object):
         trans_print("保存" + str(wind_col_name) + "成功")
 
     def mutiprocessing_to_save_file(self):
-        print_memory_usage("开始执行,占用内存")
         # 开始保存到正式文件
-        trans_print("开始保存到excel文件")
         all_tmp_files = read_excel_files(self.paths_and_table.get_read_tmp_path())
         # split_count = self.pathsAndTable.multi_pool_count
         split_count = use_files_get_max_cpu_count(all_tmp_files)
@@ -186,12 +148,7 @@ class StatisticsAndSaveFile(object):
             message = "保存文件错误,系统返回错误:" + str(e)
             raise ValueError(message)
 
-        trans_print("结束保存到excel文件")
-
     def run(self):
-        trans_print("开始保存数据到正式文件")
-        begin = datetime.datetime.now()
         self.mutiprocessing_to_save_file()
         update_trans_transfer_progress(self.paths_and_table.batch_no, self.paths_and_table.read_type, 70,
                                        self.paths_and_table.save_db)
-        trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin)

+ 1 - 2
etl/base/TransParam.py → etl/wind_power/min_sec/TransParam.py

@@ -8,7 +8,7 @@ class TransParam(object):
     def __init__(self, read_type=None, read_path=None, cols_tran=dict(),
                  wind_name_exec=str(), is_vertical_table=False, vertical_cols=list(), vertical_key=None,
                  vertical_value=None, index_cols=list(), merge_columns=False, resolve_col_prefix=None,
-                 need_valid_cols=True, header=0, wind_col_trans: dict = None):
+                 need_valid_cols=True, wind_col_trans: dict = None):
         self.read_type = read_type
         self.read_path = read_path
         self.cols_tran = cols_tran
@@ -21,5 +21,4 @@ class TransParam(object):
         self.merge_columns = merge_columns
         self.resolve_col_prefix = resolve_col_prefix
         self.need_valid_cols = need_valid_cols
-        self.header = header
         self.wind_col_trans = wind_col_trans

+ 29 - 9
service/plt_service.py

@@ -72,16 +72,16 @@ def update_trans_transfer_progress(batch_no, trans_type, transfer_progress=0, sa
 
 
 # 获取执行的数据
-def get_exec_data(run_count: int = 1) -> dict:
+def get_batch_exec_data(run_count: int = 1) -> dict:
     query_running_sql = "select count(1) as count from data_transfer where trans_sys_status = 0"
-    query_next_exdc_sql = """
+    query_next_exec_sql = """
     SELECT
         t.*,a.field_name,b.batch_name
     FROM
         data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
         inner join wind_field_batch b on t.batch_code = b.batch_code
     WHERE
-        ((t.trans_sys_status = -1 and t.transfer_state = 0) or ( t.trans_sys_status in (1,2) and t.transfer_state = 0))
+         t.trans_sys_status in (-1,1,2) and t.transfer_state = 0
     AND t.transfer_addr != ''
     ORDER BY
         t.update_time
@@ -92,12 +92,30 @@ def get_exec_data(run_count: int = 1) -> dict:
     if now_count >= run_count:
         return None
     else:
-        data = plt.execute(query_next_exdc_sql)
+        data = plt.execute(query_next_exec_sql)
         if type(data) == tuple:
             return {}
         return data[0]
 
 
+def get_data_by_batch_no_and_type(batch_no, transfer_type):
+    query_exec_sql = f"""
+    SELECT
+        t.*,a.field_name,b.batch_name
+    FROM
+        data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
+        inner join wind_field_batch b on t.batch_code = b.batch_code
+    WHERE
+         t.trans_sys_status in (-1,1,2) and t.transfer_state = 2 and t.batch_code = '{batch_no}' and t.transfer_type = '{transfer_type}'
+    AND t.transfer_addr != ''
+    """
+
+    data = plt.execute(query_exec_sql)
+    if type(data) == tuple:
+        return None
+    return data[0]
+
+
 def get_all_wind(field_code):
     query_sql = """
     SELECT t.engine_code,t.engine_name,t.rated_capacity,a.rated_cut_out_windspeed 
@@ -131,9 +149,11 @@ def get_base_wind_and_power(wind_turbine_number):
 
 
 if __name__ == '__main__':
-    print(get_exec_data(run_count=1))
-
-    print("**********************")
-    print(get_exec_data(run_count=2))
-
+    # print(get_batch_exec_data(run_count=1))
+    #
+    # print("**********************")
+    # print(get_batch_exec_data(run_count=2))
+    # print("**********************")
+    print(get_data_by_batch_no_and_type("test_", "second"))
 # print(update_trans_status_success("test_唐龙-定时任务测试", "second", 10))
+    begin = datetime.datetime.now()

+ 55 - 83
service/trans_service.py

@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # @Time    : 2024/6/7
 # @Author  : 魏志亮
+import datetime
 import os
 import traceback
 
@@ -12,7 +13,7 @@ from utils.log.trans_log import trans_print
 trans = ConnectMysql("trans")
 
 
-def get_trans_conf(field_code, wind_name, trans_type) -> dict:
+def get_min_sec_conf(field_code, trans_type) -> dict:
     query_sql = "SELECT * FROM trans_conf where wind_code = %s and type = %s"
     res = trans.execute(query_sql, (field_code, trans_type))
     print(res)
@@ -21,14 +22,27 @@ def get_trans_conf(field_code, wind_name, trans_type) -> dict:
     return res[0]
 
 
-def save_to_trans_conf(data_dict=dict()):
-    trans.save_dict(data_dict)
+def get_fault_warn_conf(field_code, trans_type) -> dict:
+    types = list()
+    if trans_type == 'fault':
+        types.append(1)
+    elif trans_type == 'warn':
+        types.append(2)
+    else:
+        trans_print(f"未找到{trans_type}告警/故障的配置")
+        raise ValueError(f"未找到{trans_type}告警/故障的配置")
 
+    types.append(3)
 
-zhishu_list = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]
+    query_sql = "SELECT * FROM warn_fault_conf where wind_code = %s and type in %s and status = 1"
+    res = trans.execute(query_sql, (field_code, types))
+    print(res)
+    if type(res) == tuple:
+        return None
+    return res[0]
 
 
-def creat_table_and_add_partition(table_name, win_names, read_type):
+def creat_min_sec_table(table_name, win_names, read_type):
     create_sql = f"""
     CREATE TABLE
     IF NOT EXISTS `{table_name}` (
@@ -91,7 +105,7 @@ def creat_table_and_add_partition(table_name, win_names, read_type):
     ) ENGINE = myisam DEFAULT CHARSET = utf8mb4
     """
 
-    if read_type == 'second' and len(win_names) > 1:
+    if read_type == 'second' and win_names and len(win_names) > 1:
 
         create_sql = create_sql + f" PARTITION BY LIST COLUMNS(`wind_turbine_number`) ("
         partition_strs = list()
@@ -145,86 +159,44 @@ def batch_statistics(table_name):
         return None
 
 
+def create_warn_fault_table(table_name):
+    sql = f"""
+    CREATE TABLE `{table_name}` (
+      `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号',
+      `begin_time` datetime DEFAULT NULL COMMENT '开始时间',
+      `end_time` datetime DEFAULT NULL COMMENT '结束时间',
+      `time_diff` int DEFAULT NULL COMMENT '处理耗时,单位秒',
+      `fault_id` varchar(20) DEFAULT NULL COMMENT '报警或者故障ID',
+      `fault_code` varchar(50) DEFAULT NULL COMMENT '报警或者故障CODE',
+      `fault_detail` varchar(255) DEFAULT NULL COMMENT '错误描述',
+      `fault_level` varchar(20) DEFAULT NULL COMMENT '报警等级',
+      `fault_type` varchar(20) DEFAULT NULL COMMENT '报警类型',
+      `stop_status` varchar(20) DEFAULT NULL COMMENT '刹车状态',
+      KEY `wind_turbine_number` (`wind_turbine_number`),
+      KEY `begin_time` (`begin_time`),
+      KEY `end_time` (`end_time`)
+    ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4
+    """
+
+    trans.execute(sql)
+
+
 if __name__ == '__main__':
     # path_prix = r"/data/download/collection_data/2完成/招远风电场-山东-大唐/清理数据/WOF063100040-WOB00013/second"
     # files = ["WOG00030.csv", "WOG00034.csv"]
     # for path in files:
     #     save_file_to_db("WOF063100040-WOB00013_second", path_prix + os.sep + path, batch_count=100000)
 
-    table_name = "test"
-    read_type = "second"
-    wind_names = ['WOG00030', 'WOG00034']
-
-    create_sql = f"""
-    CREATE TABLE
-    IF NOT EXISTS `{table_name}` (
-        `wind_turbine_number` VARCHAR (20) DEFAULT NULL COMMENT '风机编号',
-        `wind_turbine_name` VARCHAR(20) DEFAULT NULL COMMENT '风机原始名称',
-        `time_stamp` datetime NOT NULL COMMENT '时间戳',
-        `active_power` DOUBLE DEFAULT NULL COMMENT '有功功率',
-        `rotor_speed` DOUBLE DEFAULT NULL COMMENT '风轮转速',
-        `generator_speed` DOUBLE DEFAULT NULL COMMENT '发电机转速',
-        `wind_velocity` DOUBLE DEFAULT NULL COMMENT '风速',
-        `pitch_angle_blade_1` DOUBLE DEFAULT NULL COMMENT '桨距角1',
-        `pitch_angle_blade_2` DOUBLE DEFAULT NULL COMMENT '桨距角2',
-        `pitch_angle_blade_3` DOUBLE DEFAULT NULL COMMENT '桨距角3',
-        `cabin_position` DOUBLE DEFAULT NULL COMMENT '机舱位置',
-        `true_wind_direction` DOUBLE DEFAULT NULL COMMENT '绝对风向',
-        `yaw_error1` DOUBLE DEFAULT NULL COMMENT '对风角度',
-        `set_value_of_active_power` DOUBLE DEFAULT NULL COMMENT '有功功率设定值',
-        `gearbox_oil_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱油温',
-        `generatordrive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机驱动端轴承温度',
-        `generatornon_drive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机非驱动端轴承温度',
-        `cabin_temperature` DOUBLE DEFAULT NULL COMMENT '机舱内温度',
-        `twisted_cable_angle` DOUBLE DEFAULT NULL COMMENT '扭缆角度',
-        `front_back_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱前后振动',
-        `side_to_side_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱左右振动',
-        `actual_torque` DOUBLE DEFAULT NULL COMMENT '实际力矩',
-        `given_torque` DOUBLE DEFAULT NULL COMMENT '给定力矩',
-        `clockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '顺时针偏航次数',
-        `counterclockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '逆时针偏航次数',
-        `unusable` DOUBLE DEFAULT NULL COMMENT '不可利用',
-        `power_curve_available` DOUBLE DEFAULT NULL COMMENT '功率曲线可用',
-        `required_gearbox_speed` DOUBLE DEFAULT NULL COMMENT '齿轮箱转速',
-        `inverter_speed_master_control` DOUBLE DEFAULT NULL COMMENT '变频器转速(主控)',
-        `outside_cabin_temperature` DOUBLE DEFAULT NULL COMMENT '环境温度',
-        `main_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '主轴承轴承温度',
-        `gearbox_high_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱高速轴轴承温度',
-        `gearboxmedium_speed_shaftbearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱中速轴轴承温度',
-        `gearbox_low_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱低速轴轴承温度',
-        `generator_winding1_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组1温度',
-        `generator_winding2_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组2温度',
-        `generator_winding3_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组3温度',
-        `wind_turbine_status` DOUBLE DEFAULT NULL COMMENT '风机状态1',
-        `wind_turbine_status2` DOUBLE DEFAULT NULL COMMENT '风机状态2',
-        `turbulence_intensity` DOUBLE DEFAULT NULL COMMENT '湍流强度',
-        `lab` int DEFAULT NULL COMMENT '-1:停机 0:好点  1:欠发功率点;2:超发功率点;3:额定风速以上的超发功率点 4: 限电',
-        `year` INT (4) DEFAULT NULL COMMENT '年',
-        `month` INT (2) DEFAULT NULL COMMENT '月',
-        `day` INT (2) DEFAULT NULL COMMENT '日',
-        `param1` DOUBLE DEFAULT NULL COMMENT '预留1',
-        `param2` DOUBLE DEFAULT NULL COMMENT '预留2',
-        `param3` DOUBLE DEFAULT NULL COMMENT '预留3',
-        `param4` DOUBLE DEFAULT NULL COMMENT '预留4',
-        `param5` DOUBLE DEFAULT NULL COMMENT '预留5',
-        `param6` VARCHAR (20) DEFAULT NULL COMMENT '预留6',
-        `param7` VARCHAR (20) DEFAULT NULL COMMENT '预留7',
-        `param8` VARCHAR (20) DEFAULT NULL COMMENT '预留8',
-        `param9` VARCHAR (20) DEFAULT NULL COMMENT '预留9',
-        `param10` VARCHAR (20) DEFAULT NULL COMMENT '预留10',
-         KEY `time_stamp` (`time_stamp`),
-         KEY `wind_turbine_number` (`wind_turbine_number`)
-    ) ENGINE = myisam DEFAULT CHARSET = utf8mb4
-    """
-
-    if read_type == 'second' and len(wind_names) > 1:
-
-        create_sql = create_sql + f" PARTITION BY LIST COLUMNS(`wind_turbine_number`) ("
-        partition_strs = list()
-        for wind_name in wind_names:
-            partition_strs.append(f" PARTITION p{wind_name} VALUES IN('{wind_name}')")
-
-        create_sql = create_sql + ",".join(partition_strs) + ")"
-
-
-    print(create_sql)
+    # sql = """
+    # SELECT wind_turbine_number, time_stamp, wind_velocity, active_power
+    #                            FROM `WOF085500002-WOB000001_second`
+    #                            WHERE  time_stamp >= '2024-02-17 00:00:00' AND time_stamp <= '2024-05-14 00:00:00' AND lab = 0
+    # """
+    #
+    # begin = datetime.datetime.now()
+    # df = trans.read_sql_to_df(sql)
+    # end = datetime.datetime.now()
+    # print(df.shape)
+    # print(df.info())
+    # print("Time used:", (end - begin).seconds)
+    get_fault_warn_conf("test", "fault")

+ 0 - 151
test_app_run.py

@@ -1,151 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time    : 2024/6/11
-# @Author  : 魏志亮
-import os
-import sys
-import traceback
-
-
-def run_schedule(step=0, end=4, run_count=1):
-    # 更新超时任务
-    update_timeout_trans_data()
-
-    data = get_exec_data(run_count)
-    if data is None:
-        trans_print("当前有任务在执行")
-    elif len(data.keys()) == 0:
-        trans_print("当前无任务")
-    else:
-        batch_no = data['batch_code']
-        batch_name = data['batch_name']
-        transfer_type = data['transfer_type']
-        transfer_file_addr = data['transfer_addr']
-        field_code = data['field_code']
-        field_name = data['field_name']
-
-        __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr, field_name, field_code,
-                     save_db=True)
-
-
-def run_local(step=0, end=3, batch_no=None, batch_name='', transfer_type=None, transfer_file_addr=None, field_name=None,
-              field_code="测试", save_db=False):
-    if batch_no is None or str(batch_no).strip() == '':
-        return "批次编号不能为空"
-
-    if transfer_type not in ['second', 'minute', 'second_1']:
-        return "查询类型错误"
-
-    if transfer_file_addr is None or str(transfer_file_addr).strip() == '':
-        return "文件路径不能为空"
-
-    __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr, field_name, field_code,
-                 save_db=save_db)
-
-
-def __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr=None, field_name=None,
-                 field_code="测试",
-                 save_db=False):
-    trance_id = '-'.join([batch_no, field_name, transfer_type])
-    set_trance_id(trance_id)
-    conf_map = get_trans_conf(field_code, field_name, transfer_type)
-    if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
-        message = f"未找到{field_name}的{transfer_type}配置"
-        trans_print(message)
-        update_trans_status_error(batch_no, transfer_type, message, save_db)
-    else:
-
-        resolve_col_prefix = read_conf(conf_map, 'resolve_col_prefix')
-        wind_name_exec = read_conf(conf_map, 'wind_name_exec', None)
-        is_vertical_table = read_conf(conf_map, 'is_vertical_table', False)
-        merge_columns = read_conf(conf_map, 'merge_columns', False)
-
-        vertical_cols = read_conf(conf_map, 'vertical_read_cols', '').split(',')
-        index_cols = read_conf(conf_map, 'vertical_index_cols', '').split(',')
-        vertical_key = read_conf(conf_map, 'vertical_col_key')
-        vertical_value = read_conf(conf_map, 'vertical_col_value')
-        need_valid_cols = not merge_columns
-
-        begin_header = read_conf(conf_map, 'begin_header', 0)
-
-        cols_trans_all = dict()
-        trans_cols = ['wind_turbine_number', 'time_stamp', 'active_power', 'rotor_speed', 'generator_speed',
-                      'wind_velocity', 'pitch_angle_blade_1', 'pitch_angle_blade_2', 'pitch_angle_blade_3',
-                      'cabin_position', 'true_wind_direction', 'yaw_error1', 'set_value_of_active_power',
-                      'gearbox_oil_temperature', 'generatordrive_end_bearing_temperature',
-                      'generatornon_drive_end_bearing_temperature', 'wind_turbine_status',
-                      'wind_turbine_status2',
-                      'cabin_temperature', 'twisted_cable_angle', 'front_back_vibration_of_the_cabin',
-                      'side_to_side_vibration_of_the_cabin', 'actual_torque', 'given_torque',
-                      'clockwise_yaw_count',
-                      'counterclockwise_yaw_count', 'unusable', 'power_curve_available',
-                      'required_gearbox_speed',
-                      'inverter_speed_master_control', 'outside_cabin_temperature', 'main_bearing_temperature',
-                      'gearbox_high_speed_shaft_bearing_temperature',
-                      'gearboxmedium_speed_shaftbearing_temperature',
-                      'gearbox_low_speed_shaft_bearing_temperature', 'generator_winding1_temperature',
-                      'generator_winding2_temperature', 'generator_winding3_temperature',
-                      'turbulence_intensity', 'param1',
-                      'param2', 'param3', 'param4', 'param5', 'param6', 'param7', 'param8', 'param9', 'param10']
-
-        for col in trans_cols:
-            cols_trans_all[col] = read_conf(conf_map, col, '')
-
-        params = TransParam(read_type=transfer_type, read_path=transfer_file_addr,
-                            cols_tran=cols_trans_all,
-                            wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
-                            vertical_cols=vertical_cols, vertical_key=vertical_key,
-                            vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
-                            resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols,
-                            header=begin_header)
-
-        try:
-            trans_subject = WindFarms(batch_no=batch_no, batch_name=batch_name, field_code=field_code,
-                                      field_name=field_name,
-                                      save_db=save_db,
-                                      header=begin_header, trans_param=params)
-            trans_subject.run(step=step, end=end)
-        except Exception as e:
-            trans_print(traceback.format_exc())
-            message = "系统返回错误:" + str(e)
-            update_trans_status_error(batch_no, transfer_type, message, save_db)
-        finally:
-            set_trance_id("")
-            # trans_subject.pathsAndTable.delete_tmp_files()
-
-
-if __name__ == '__main__':
-    env = None
-    if len(sys.argv) >= 2:
-        env = sys.argv[1]
-    else:
-        env = 'dev'
-    print(sys.argv)
-    if env is None:
-        raise Exception("请配置运行环境")
-
-    os.environ['env'] = env
-
-    run_count = 1
-    if len(sys.argv) >= 3:
-        run_count = int(sys.argv[2])
-
-    from utils.log.trans_log import trans_print, set_trance_id
-    from etl.base.TransParam import TransParam
-    from etl.base.WindFarms import WindFarms
-    from service.plt_service import get_exec_data, update_trans_status_error, update_timeout_trans_data
-    from service.trans_service import get_trans_conf
-    from utils.conf.read_conf import read_conf
-
-    # run_schedule(run_count=run_count)
-
-    # run_local(0, 3, batch_no='test_11', batch_name='test', transfer_type='minute',
-    #           transfer_file_addr=r'D:\trans_data\密马风电场\收资数据\minute', field_name='密马风电场',
-    #           field_code="WOF035200003", save_db=False)
-
-    run_local(4, 4, batch_no='WOF053600062-WOB000010', batch_name='ZYFDC000013', transfer_type='second',
-              transfer_file_addr=r'/data/download/collection_data/2完成/招远风电场-山东-大唐/收资数据/招远秒级数据', field_name='招远风电场',
-              field_code="WOF053600062", save_db=True)
-
-    # run_local(0, 3, batch_no='WOF043600007-WOB000001', batch_name='XALFDC0814', transfer_type='second',
-    #           transfer_file_addr=r'D:\trans_data\新艾里风电场\收资数据\1号风机', field_name='新艾里风电场',
-    #           field_code="WOF043600007", save_db=False)

+ 74 - 0
test_run_local.py

@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/11
+# @Author  : 魏志亮
+import os
+import sys
+
+
+def get_exec_data(batch_no=None, read_type=None, run_count=1):
+    if batch_no and read_type:
+        data = get_data_by_batch_no_and_type(batch_no, read_type)
+        if data is None:
+            raise ValueError(f"未找到批次号:{batch_no},类型:{read_type}")
+
+    else:
+        data = get_batch_exec_data(run_count)
+        if data is None:
+            trans_print("当前有任务在执行")
+            sys.exit(0)
+        elif len(data.keys()) == 0:
+            trans_print("当前无任务")
+            sys.exit(0)
+
+    return data
+
+
+def run(data: dict = dict(), save_db=False):
+    exec_process = None
+    if data['transfer_type'] in ['second', 'minute']:
+        exec_process = MinSecTrans(data=data, save_db=save_db)
+
+    if data['transfer_type'] in ['fault', 'warn']:
+        exec_process = FaultWarnTrans(data=data, save_db=save_db)
+
+    if exec_process is None:
+        raise Exception("No exec process")
+    exec_process.run()
+
+
+if __name__ == '__main__':
+    env = None
+    if len(sys.argv) >= 2:
+        env = sys.argv[1]
+    else:
+        env = 'dev'
+    print(sys.argv)
+    if env is None:
+        raise Exception("请配置运行环境")
+
+    os.environ['env'] = env
+
+    run_count = 1
+    if len(sys.argv) >= 3:
+        run_count = int(sys.argv[2])
+
+    conf_path = '/data/config/etl_config.yaml'
+    if len(sys.argv) >= 4:
+        conf_path = sys.argv[3]
+
+    os.environ['ETL_CONF'] = conf_path
+
+    from utils.log.trans_log import trans_print
+    from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type
+    from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
+    from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
+
+    data = dict()
+    data['batch_code'] = "test"
+    data['batch_name'] = "test"
+    data['transfer_type'] = "fault"
+    data['transfer_addr'] = r"D:\报警\唐珍风电2023年报警信息.xlsx"
+    data['field_code'] = "test"
+    data['field_name'] = "test"
+
+    run(data=data, save_db=False)

+ 1 - 1
tmp_file/read_and_draw_png.py

@@ -1,7 +1,7 @@
 import multiprocessing
 import os
 
-from etl.step.ClassIdentifier import ClassIdentifier
+from etl.wind_power.min_sec.ClassIdentifier import ClassIdentifier
 from utils.draw.draw_file import scatter
 from utils.file.trans_methods import read_file_to_df
 

+ 13 - 1
utils/db/ConnectMysql.py

@@ -1,6 +1,7 @@
 import os
 import traceback
 
+import pandas as pd
 import pymysql
 from pymysql.cursors import DictCursor
 from sqlalchemy import create_engine
@@ -12,7 +13,7 @@ from utils.log.trans_log import trans_print
 class ConnectMysql:
 
     def __init__(self, connet_name):
-        self.yaml_data = yaml_conf("/data/config/etl_config.yaml")
+        self.yaml_data = yaml_conf(os.environ.get('ETL_CONF', "/data/config/etl_config.yaml"))
         self.connet_name = connet_name
         if 'env' in os.environ:
             self.env = os.environ['env']
@@ -49,3 +50,14 @@ class ConnectMysql:
         dbname = config['database']
         engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
         df.to_sql(table_name, engine, index=False, if_exists='append')
+
+    def read_sql_to_df(self, sql):
+        config = self.yaml_data[self.connet_name + "_" + self.env]
+        username = config['user']
+        password = config['password']
+        host = config['host']
+        port = config['port']
+        dbname = config['database']
+        engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
+        df = pd.read_sql_query(sql, engine)
+        return df

+ 33 - 23
utils/file/trans_methods.py

@@ -3,7 +3,6 @@
 # @Author  : 魏志亮
 import datetime
 import os
-import re
 import shutil
 import warnings
 
@@ -47,39 +46,52 @@ def split_array(array, num):
 
 
 def find_read_header(file_path, trans_cols):
-    print(trans_cols)
     df = read_file_to_df(file_path, nrows=20)
+    df.reset_index(inplace=True)
     count = 0
+    header = None
     for col in trans_cols:
         if col in df.columns:
             count = count + 1
             if count >= 2:
-                return 0
+                header = 0
+                break
 
     count = 0
 
+    values = list()
     for index, row in df.iterrows():
         for col in trans_cols:
             if col in row.values:
                 count = count + 1
                 if count > 2:
-                    return index + 1
+                    header = index + 1
+                    break
 
-    return None
+    read_cols = []
+    for col in values:
+        if col in trans_cols:
+            read_cols.append(col)
+
+    return header, read_cols
 
 
 # 读取数据到df
-def read_file_to_df(file_path, read_cols=list(), header=0, trans_cols=None, nrows=None):
+def read_file_to_df(file_path, read_cols=list(), trans_cols=None, nrows=None):
     begin = datetime.datetime.now()
     trans_print('开始读取文件', file_path)
+    header = 0
+    find_cols = list()
     if trans_cols:
-        header = find_read_header(file_path, trans_cols)
+        header, find_cols = find_read_header(file_path, trans_cols)
         trans_print(os.path.basename(file_path), "读取第", header, "行")
         if header is None:
             message = '未匹配到开始行,请检查并重新指定'
             trans_print(message)
             raise Exception(message)
 
+    read_cols.extend(find_cols)
+
     try:
         df = pd.DataFrame()
         if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
@@ -100,7 +112,7 @@ def read_file_to_df(file_path, read_cols=list(), header=0, trans_cols=None, nrow
                     df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn', nrows=nrows)
 
         else:
-            xls = pd.ExcelFile(file_path)
+            xls = pd.ExcelFile(file_path, engine="calamine")
             # 获取所有的sheet名称
             sheet_names = xls.sheet_names
             for sheet_name in sheet_names:
@@ -110,10 +122,9 @@ def read_file_to_df(file_path, read_cols=list(), header=0, trans_cols=None, nrow
                     now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, nrows=nrows)
 
                 now_df['sheet_name'] = sheet_name
-
                 df = pd.concat([df, now_df])
-
-        trans_print('文件读取成功', file_path, '文件数量', df.shape, '耗时', datetime.datetime.now() - begin)
+            xls.close()
+        trans_print('文件读取成功:', file_path, '数据数量:', df.shape, '耗时:', datetime.datetime.now() - begin)
     except Exception as e:
         trans_print('读取文件出错', file_path, str(e))
         message = '文件:' + os.path.basename(file_path) + ',' + str(e)
@@ -141,6 +152,9 @@ def __build_directory_dict(directory_dict, path, filter_types=None):
 
 # 读取路径下所有的excel文件
 def read_excel_files(read_path):
+    if os.path.isfile(read_path):
+        return [read_path]
+
     directory_dict = {}
     __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
 
@@ -174,17 +188,13 @@ def create_file_path(path, is_file_path=False):
         os.makedirs(path, exist_ok=True)
 
 
-# 格式化风机名称
-def generate_turbine_name(turbine_name='F0001', prefix='F'):
-    strinfo = re.compile(r"[\D*]")
-    name = strinfo.sub('', str(turbine_name))
-    return prefix + str(int(name)).zfill(3)
+if __name__ == '__main__':
+    datas = read_excel_files(r"D:\data\清理数据\招远风电场\WOF053600062-WOB000009_ZYFDC000012\minute")
+    for data in datas:
+        print(data)
 
+    print("*" * 20)
 
-if __name__ == '__main__':
-    # files = read_excel_files(r'D:\trans_data\10.xls')
-    # for file in files:
-    file = r'D:\trans_data\新艾里风电场10号风机.csv'
-    read_file_to_df(file, trans_cols=
-    ['', '风向', '时间', '设备号', '机舱方向总角度', '$folder[2]', '发电机转速30秒平均值', '机组运行模式', '机舱旋转角度', '主轴转速', '变桨角度30秒平均值', '记录时间',
-     '发电机功率30秒平均值', '风速30秒平均值'])
+    datas = read_excel_files(r"D:\data\清理数据\招远风电场\WOF053600062-WOB000009_ZYFDC000012\minute\WOG00066.csv.gz")
+    for data in datas:
+        print(data)

+ 8 - 3
utils/log/trans_log.py

@@ -7,6 +7,8 @@ import logging
 import os
 import sys
 
+from utils.conf.read_conf import read_conf, yaml_conf
+
 
 def set_trance_id(trace_id):
     """设置当前线程的链路ID"""
@@ -28,12 +30,15 @@ logger = logging.getLogger("etl_tools")
 logger.setLevel(logging.INFO)
 stout_handle = logging.StreamHandler(sys.stdout)
 stout_handle.setFormatter(
-    logging.Formatter("%(asctime)s-%(trace_id)s-%(levelname)s-%(filename)-8s:%(lineno)s: %(message)s"))
+    logging.Formatter("%(asctime)s-%(trace_id)s: %(message)s"))
 stout_handle.setLevel(logging.INFO)
 stout_handle.addFilter(ContextFilter())
 logger.addHandler(stout_handle)
 
-log_path = r'/data/logs/etl_tools_' + (os.environ['env'] if 'env' in os.environ else 'dev')
+config = yaml_conf(os.environ['ETL_CONF'])
+log_path_dir = read_conf(config, 'log_path_dir', "/data/logs")
+
+log_path = log_path_dir + os.sep + r'etl_tools_' + (os.environ['env'] if 'env' in os.environ else 'dev')
 file_path = os.path.join(log_path)
 
 if not os.path.exists(file_path):
@@ -42,7 +47,7 @@ file_name = file_path + os.sep + str(datetime.date.today()) + '.log'
 
 file_handler = logging.FileHandler(file_name, encoding='utf-8')
 file_handler.setFormatter(
-    logging.Formatter("%(asctime)s-%(trace_id)s-%(levelname)s-%(filename)-8s:%(lineno)s: %(message)s"))
+    logging.Formatter("%(asctime)s-%(trace_id)s: %(message)s"))
 file_handler.setLevel(logging.INFO)
 file_handler.addFilter(ContextFilter())
 logger.addHandler(file_handler)