Przeglądaj źródła

Merge branch 'fault_warn' into dev

# Conflicts:
#	app_run.py
#	conf/etl_config.yaml
#	conf/etl_config_datang.yaml
#	etl/common/PathsAndTable.py
#	test_app_run.py
#	utils/db/ConnectMysql.py
#	utils/file/trans_methods.py
#	utils/log/trans_log.py
wzl 9 miesięcy temu
rodzic
commit
fe0fd29755
45 zmienionych plików z 1476 dodań i 811 usunięć
  1. 6 1
      .gitignore
  2. 43 128
      app_run.py
  3. 0 52
      conf/db.py
  4. 0 51
      conf/etl_config.yaml
  5. 5 31
      conf/etl_config_datang.yaml
  6. 26 0
      conf/etl_config_dev.yaml
  7. 26 0
      conf/etl_config_prod.yaml
  8. 0 98
      etl/base/WindFarms.py
  9. 116 0
      etl/common/BaseDataTrans.py
  10. 4 3
      etl/common/ClearData.py
  11. 22 15
      etl/common/PathsAndTable.py
  12. 12 19
      etl/common/SaveToDb.py
  13. 1 6
      etl/common/UnzipAndRemove.py
  14. 0 0
      etl/common/__init__.py
  15. 0 0
      etl/wind_power/__init__.py
  16. 124 0
      etl/wind_power/fault_warn/FaultWarnTrans.py
  17. 0 0
      etl/wind_power/fault_warn/__init__.py
  18. 0 2
      etl/wind_power/min_sec/ClassIdentifier.py
  19. 119 0
      etl/wind_power/min_sec/MinSecTrans.py
  20. 17 22
      etl/wind_power/min_sec/ReadAndSaveTmp.py
  21. 12 48
      etl/wind_power/min_sec/StatisticsAndSaveFile.py
  22. 1 2
      etl/wind_power/min_sec/TransParam.py
  23. 0 0
      etl/wind_power/min_sec/__init__.py
  24. 2 0
      ge_requirement.sh
  25. 2 1
      package.sh
  26. 19 1
      requirements.txt
  27. 33 11
      service/plt_service.py
  28. 56 84
      service/trans_service.py
  29. 0 157
      test_app_run.py
  30. 93 0
      test_run_local.py
  31. 197 0
      tmp_file/baiyushan_20240906.py
  32. 94 0
      tmp_file/cp_online_data_to_other.py
  33. 40 0
      tmp_file/error_ms_data.py
  34. 48 0
      tmp_file/filter_lose_data.py
  35. 27 0
      tmp_file/hebing_matlib_result.py
  36. 38 0
      tmp_file/queshi_bili.py
  37. 1 1
      tmp_file/read_and_draw_png.py
  38. 55 0
      tmp_file/zibo_guzhang_select_time.py
  39. 87 0
      tmp_file/对比文件夹列名差值.py
  40. 35 0
      tmp_file/白玉山每月限电损失.py
  41. 14 11
      utils/db/ConnectMysql.py
  42. 1 1
      utils/df_utils/util.py
  43. 94 62
      utils/file/trans_methods.py
  44. 2 4
      utils/log/trans_log.py
  45. 4 0
      utils/systeminfo/sysinfo.py

+ 6 - 1
.gitignore

@@ -3,4 +3,9 @@ logs
 *.iml
 .idea
 test
-tmp
+tmp
+venv
+wheels
+build
+dist
+etl_tool.spec

+ 43 - 128
app_run.py

@@ -3,146 +3,61 @@
 # @Author  : 魏志亮
 import os
 import sys
-import traceback
 
+from utils.conf.read_conf import yaml_conf, read_conf
 
-def run_schedule(step=0, end=4, run_count=1):
-    # 更新超时任务
-    update_timeout_trans_data()
 
-    data = get_exec_data(run_count)
-    if data is None:
-        trans_print("当前有任务在执行")
-    elif len(data.keys()) == 0:
-        trans_print("当前无任务")
-    else:
-        batch_no = data['batch_code']
-        batch_name = data['batch_name']
-        transfer_type = data['transfer_type']
-        transfer_file_addr = data['transfer_addr']
-        field_code = data['field_code']
-        field_name = data['field_name']
-
-        __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr, field_name, field_code,
-                     save_db=True)
-
-
-def run_local(step=0, end=3, batch_no=None, batch_name='', transfer_type=None, transfer_file_addr=None, field_name=None,
-              field_code="测试", save_db=False):
-    if batch_no is None or str(batch_no).strip() == '':
-        return "批次编号不能为空"
-
-    if transfer_type not in ['second', 'minute', 'second_1']:
-        return "查询类型错误"
-
-    if transfer_file_addr is None or str(transfer_file_addr).strip() == '':
-        return "文件路径不能为空"
-
-    __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr, field_name, field_code,
-                 save_db=save_db)
-
-
-def __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr=None, field_name=None,
-                 field_code="测试",
-                 save_db=False):
-    trance_id = '-'.join([batch_no, field_name, transfer_type])
-    set_trance_id(trance_id)
-    conf_map = get_trans_conf(field_code, field_name, transfer_type)
-    if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
-        message = f"未找到{field_name}的{transfer_type}配置"
-        trans_print(message)
-        update_trans_status_error(batch_no, transfer_type, message, save_db)
+def get_exec_data(batch_no=None, read_type=None, run_count=1):
+    if batch_no and read_type:
+        data = get_data_by_batch_no_and_type(batch_no, read_type)
+        if data is None:
+            raise ValueError(f"未找到批次号:{batch_no},类型:{read_type}")
+
     else:
+        data = get_batch_exec_data(run_count)
+        if data is None:
+            trans_print("当前有任务在执行")
+            sys.exit(0)
+        elif len(data.keys()) == 0:
+            trans_print("当前无任务")
+            sys.exit(0)
 
-        resolve_col_prefix = read_conf(conf_map, 'resolve_col_prefix')
-        wind_name_exec = read_conf(conf_map, 'wind_name_exec', None)
-        is_vertical_table = read_conf(conf_map, 'is_vertical_table', False)
-        merge_columns = read_conf(conf_map, 'merge_columns', False)
-
-        vertical_cols = read_conf(conf_map, 'vertical_read_cols', '').split(',')
-        index_cols = read_conf(conf_map, 'vertical_index_cols', '').split(',')
-        vertical_key = read_conf(conf_map, 'vertical_col_key')
-        vertical_value = read_conf(conf_map, 'vertical_col_value')
-        need_valid_cols = not merge_columns
-
-        begin_header = read_conf(conf_map, 'begin_header', 0)
-
-        cols_trans_all = dict()
-        trans_cols = ['wind_turbine_number', 'time_stamp', 'active_power', 'rotor_speed', 'generator_speed',
-                      'wind_velocity', 'pitch_angle_blade_1', 'pitch_angle_blade_2', 'pitch_angle_blade_3',
-                      'cabin_position', 'true_wind_direction', 'yaw_error1', 'set_value_of_active_power',
-                      'gearbox_oil_temperature', 'generatordrive_end_bearing_temperature',
-                      'generatornon_drive_end_bearing_temperature', 'wind_turbine_status',
-                      'wind_turbine_status2',
-                      'cabin_temperature', 'twisted_cable_angle', 'front_back_vibration_of_the_cabin',
-                      'side_to_side_vibration_of_the_cabin', 'actual_torque', 'given_torque',
-                      'clockwise_yaw_count',
-                      'counterclockwise_yaw_count', 'unusable', 'power_curve_available',
-                      'required_gearbox_speed',
-                      'inverter_speed_master_control', 'outside_cabin_temperature', 'main_bearing_temperature',
-                      'gearbox_high_speed_shaft_bearing_temperature',
-                      'gearboxmedium_speed_shaftbearing_temperature',
-                      'gearbox_low_speed_shaft_bearing_temperature', 'generator_winding1_temperature',
-                      'generator_winding2_temperature', 'generator_winding3_temperature',
-                      'turbulence_intensity', 'param1',
-                      'param2', 'param3', 'param4', 'param5', 'param6', 'param7', 'param8', 'param9', 'param10']
-
-        for col in trans_cols:
-            cols_trans_all[col] = read_conf(conf_map, col, '')
-
-        params = TransParam(read_type=transfer_type, read_path=transfer_file_addr,
-                            cols_tran=cols_trans_all,
-                            wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
-                            vertical_cols=vertical_cols, vertical_key=vertical_key,
-                            vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
-                            resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols, header=begin_header)
-
-        try:
-            trans_subject = WindFarms(batch_no=batch_no, batch_name=batch_name, field_code=field_code,
-                                      field_name=field_name,
-                                      save_db=save_db,
-                                      header=begin_header, trans_param=params)
-            trans_subject.run(step=step, end=end)
-        except Exception as e:
-            trans_print(traceback.format_exc())
-            message = "系统返回错误:" + str(e)
-            update_trans_status_error(batch_no, transfer_type, message, save_db)
-        finally:
-            set_trance_id("")
-            trans_subject.pathsAndTable.delete_tmp_files()
+    return data
 
 
-if __name__ == '__main__':
-    env = None
-    if len(sys.argv) >= 2:
-        env = sys.argv[1]
-    else:
-        env = 'dev'
-    print(sys.argv)
-    if env is None:
-        raise Exception("请配置运行环境")
+def run(batch_no=None, read_type=None, save_db=True, run_count=1):
+    data = get_exec_data(batch_no, read_type, run_count)
 
-    os.environ['env'] = env
+    exec_process = None
+    if data['transfer_type'] in ['second', 'minute']:
+        exec_process = MinSecTrans(data=data, save_db=save_db)
 
-    run_count = 1
-    if len(sys.argv) >= 3:
-        run_count = int(sys.argv[2])
+    if data['transfer_type'] in ['fault', 'warn']:
+        exec_process = FaultWarnTrans(data=data, save_db=save_db)
 
-    conf_path = '/data/config/etl_config.yaml'
-    if len(sys.argv) >= 4:
-        conf_path = sys.argv[3]
+    if exec_process is None:
+        raise Exception("No exec process")
+    exec_process.run()
 
+
+if __name__ == '__main__':
+
+    env = 'dev'
+    if len(sys.argv) >= 2:
+        env = sys.argv[1]
+
+    conf_path = os.path.abspath(f"./conf/etl_config_{env}.yaml")
     os.environ['ETL_CONF'] = conf_path
+    yaml_config = yaml_conf(conf_path)
+    os.environ['env'] = env
+    run_count = int(read_conf(yaml_config, "run_batch_count", 1))
 
-    from utils.log.trans_log import trans_print, set_trance_id
-    from etl.base.TransParam import TransParam
-    from etl.base.WindFarms import WindFarms
-    from service.plt_service import get_exec_data, update_trans_status_error, update_timeout_trans_data
-    from service.trans_service import get_trans_conf
-    from utils.conf.read_conf import read_conf
+    from utils.log.trans_log import trans_print
+    from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type
+    from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
+    from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
 
-    run_schedule(run_count=run_count)
+    trans_print("所有请求参数:", sys.argv, "env:", env, "最大可执行个数:", run_count)
+    trans_print("配置文件路径:", os.environ.get("ETL_CONF"))
 
-    # run_local(4, 4, batch_no='WOF035200003-WOB000005', batch_name='MM14号机组0719', transfer_type='second',
-    #            transfer_file_addr=r'/data/download/collection_data/1进行中/密马风电场-山西-大唐/收资数据/scada/14号/sec', field_name='密马风电场',
-    #            field_code="WOF035200003", save_db=True)
+    run(run_count=run_count)

+ 0 - 52
conf/db.py

@@ -1,52 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time    : 2024/6/19
-# @Author  : 魏志亮
-
-mysql_config = \
-    {'plt_dev': {'database': 'energy',
-                 'host': '192.168.50.233',
-                 'password': 'admin123456',
-                 'port': 3306,
-                 'user': 'admin'},
-
-     'plt_prod': {'database': 'energy_prod',
-                  'host': '192.168.50.233',
-                  'password': 'admin123456',
-                  'port': 3306,
-                  'user': 'admin'},
-
-     'plt_connect_pool_config':
-         {'blocking': True,
-          'charset': 'utf8mb4',
-          'maxcached': 5,
-          'maxconnections': 20,
-          'maxshared': 0,
-          'mincached': 2,
-          'setsession': []},
-
-     'trans_dev': {'database': 'energy_data',
-                   'host': '192.168.50.235',
-                   'password': 'admin123456',
-                   'port': 30306,
-                   'user': 'root'},
-
-     'trans_prod': {'database': 'energy_data_prod',
-                    'host': '192.168.50.235',
-                    'password': 'admin123456',
-                    'port': 30306,
-                    'user': 'root'},
-
-     'trans_connect_pool_config':
-         {'blocking': True,
-          'charset': 'utf8',
-          'maxcached': 20,
-          'maxconnections': 20,
-          'maxshared': 0,
-          'mincached': 1,
-          'setsession': []}
-     }
-
-
-if __name__ == '__main__':
-    import yaml
-    print(yaml.dump(mysql_config, indent=4))

+ 0 - 51
conf/etl_config.yaml

@@ -1,51 +0,0 @@
-plt_connect_pool_config:
-  blocking: true
-  charset: utf8mb4
-  maxcached: 5
-  maxconnections: 20
-  maxshared: 0
-  mincached: 2
-  setsession: [ ]
-plt_dev:
-  database: energy
-  host: 192.168.50.233
-  password: admin123456
-  port: 3306
-  user: admin
-plt_prod:
-  database: energy_prod
-  host: 192.168.50.233
-  password: admin123456
-  port: 3306
-  user: admin
-trans_connect_pool_config:
-  blocking: true
-  charset: utf8
-  maxcached: 20
-  maxconnections: 20
-  maxshared: 0
-  mincached: 1
-  setsession: [ ]
-trans_dev:
-  database: energy_data
-  host: 192.168.50.235
-  password: admin123456
-  port: 30306
-  user: root
-trans_prod:
-  database: energy_data_prod
-  host: 192.168.50.235
-  password: admin123456
-  port: 30306
-  user: root
-
-# 如果要放在原始路径,则配置这个 以下面的名称作为切割点,新建清理数据文件夹
-etl_origin_path_contain: 收资数据
-# 如果单独保存,配置这个路径
-save_path:
-
-
-log_path_dir: /data/logs
-
-# 临时文件存放处,有些甲方公司隔得tmp太小,只好自己配置
-tmp_base_path: /tmp

+ 5 - 31
conf/etl_config_datang.yaml

@@ -1,38 +1,10 @@
-plt_connect_pool_config:
-  blocking: true
-  charset: utf8mb4
-  maxcached: 5
-  maxconnections: 20
-  maxshared: 0
-  mincached: 2
-  setsession: [ ]
-plt_dev:
+plt:
   database: energy
   host: 172.16.37.22
   password: admin123456
   port: 3306
   user: root
-plt_prod:
-  database: energy_prod
-  host: 172.16.37.22
-  password: admin123456
-  port: 3306
-  user: root
-trans_connect_pool_config:
-  blocking: true
-  charset: utf8
-  maxcached: 20
-  maxconnections: 20
-  maxshared: 0
-  mincached: 1
-  setsession: [ ]
-trans_dev:
-  database: energy_data
-  host: 172.16.37.24
-  password: admin123456
-  port: 3306
-  user: root
-trans_prod:
+trans:
   database: energy_data_prod
   host: 172.16.37.24
   password: admin123456
@@ -47,4 +19,6 @@ save_path:
 log_path_dir: /data/collection_data/logs
 
 # 临时文件存放处,有些甲方公司隔得tmp太小,只好自己配置
-tmp_base_path: /data/collection_data/tmp
+tmp_base_path: /data/collection_data/tmp
+
+run_batch_count: 1

+ 26 - 0
conf/etl_config_dev.yaml

@@ -0,0 +1,26 @@
+plt:
+  database: energy
+  host: 192.168.50.233
+  password: admin123456
+  port: 3306
+  user: admin
+
+trans:
+  database: energy_data
+  host: 192.168.50.235
+  password: admin123456
+  port: 30306
+  user: root
+
+# 如果要放在原始路径,则配置这个 以下面的名称作为切割点,新建清理数据文件夹
+etl_origin_path_contain: 收资数据
+# 如果单独保存,配置这个路径
+save_path:
+
+# 日志保存路径
+log_path_dir: /data/logs
+
+# 临时文件存放处,有些甲方公司隔得tmp太小,只好自己配置
+tmp_base_path: /tmp
+
+run_batch_count: 1

+ 26 - 0
conf/etl_config_prod.yaml

@@ -0,0 +1,26 @@
+plt:
+  database: energy_prod
+  host: 192.168.50.233
+  password: admin123456
+  port: 3306
+  user: admin
+
+trans:
+  database: energy_data_prod
+  host: 192.168.50.235
+  password: admin123456
+  port: 30306
+  user: root
+
+# 如果要放在原始路径,则配置这个 以下面的名称作为切割点,新建清理数据文件夹
+etl_origin_path_contain: 收资数据
+# 如果单独保存,配置这个路径
+save_path:
+
+# 日志保存路径
+log_path_dir: /data/logs
+
+# 临时文件存放处,有些甲方公司隔得tmp太小,只好自己配置
+tmp_base_path: /tmp
+
+run_batch_count: 2

+ 0 - 98
etl/base/WindFarms.py

@@ -1,98 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time    : 2024/5/15
-# @Author  : 魏志亮
-import datetime
-import multiprocessing
-
-from etl.base.PathsAndTable import PathsAndTable
-from etl.base.TransParam import TransParam
-from etl.step.ClearData import ClearData
-from etl.step.ReadAndSaveTmp import ReadAndSaveTmp
-from etl.step.SaveToDb import SaveToDb
-from etl.step.StatisticsAndSaveFile import StatisticsAndSaveFile
-from etl.step.UnzipAndRemove import UnzipAndRemove
-from service.plt_service import get_all_wind, update_trans_status_running, \
-    update_trans_status_success, update_trans_transfer_progress
-from service.trans_service import batch_statistics
-from utils.df_utils.util import get_time_space
-from utils.file.trans_methods import *
-
-
-class WindFarms(object):
-
-    def __init__(self, batch_no=None, batch_name=None, field_code=None, field_name=None, params: TransParam = None,
-                 save_db=True, header=0, trans_param: TransParam = None):
-        self.batch_no = batch_no
-        self.batch_name = batch_name
-        self.field_code = field_code
-        self.field_name = field_name
-        self.save_zip = False
-        self.trans_param = params
-        self.wind_col_trans, self.rated_power_and_cutout_speed_map = get_all_wind(self.field_code)
-        self.batch_count = 50000
-        self.save_path = None
-        self.save_db = save_db
-        self.statistics_map = multiprocessing.Manager().dict()
-        self.header = header
-        self.trans_param = trans_param
-        self.trans_param.wind_col_trans = self.wind_col_trans
-        self.pathsAndTable = PathsAndTable(batch_no, batch_name, self.trans_param.read_path, self.field_name,
-                                           self.trans_param.read_type, save_db, save_zip=self.save_zip)
-
-    def run(self, step=0, end=4):
-        begin = datetime.datetime.now()
-        trans_print("开始执行")
-        update_trans_status_running(self.batch_no, self.trans_param.read_type, self.save_db)
-        if step <= 0 and end >= 0:
-            clean_data = ClearData(self.pathsAndTable)
-            clean_data.run()
-
-        if step <= 1 and end >= 1:
-            # 更新运行状态到运行中
-            unzip_and_remove = UnzipAndRemove(self.pathsAndTable)
-            unzip_and_remove.run()
-
-        if step <= 2 and end >= 2:
-            read_and_save_tmp = ReadAndSaveTmp(self.pathsAndTable, self.trans_param)
-            read_and_save_tmp.run()
-
-        if step <= 3 and end >= 3:
-            # 保存到正式文件
-            statistics_and_save_file = StatisticsAndSaveFile(self.pathsAndTable, self.trans_param, self.statistics_map,
-                                                             self.rated_power_and_cutout_speed_map)
-            statistics_and_save_file.run()
-
-        if step <= 4 and end >= 4:
-            if self.save_db:
-                save_to_db = SaveToDb(self.pathsAndTable)
-                save_to_db.run()
-
-        update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 99, self.save_db)
-        # 如果end==0 则说明只是进行了验证
-        if end >= 4:
-            all_files = read_excel_files(self.pathsAndTable.get_save_path())
-            if step <= 3:
-                update_trans_status_success(self.batch_no, self.trans_param.read_type,
-                                            len(all_files),
-                                            self.statistics_map['time_granularity'],
-                                            self.statistics_map['min_date'], self.statistics_map['max_date'],
-                                            self.statistics_map['total_count'], self.save_db)
-            else:
-                df = read_file_to_df(all_files[0], read_cols=['time_stamp'])
-                df['time_stamp'] = pd.to_datetime(df['time_stamp'])
-                time_granularity = get_time_space(df, 'time_stamp')
-                batch_data = batch_statistics("_".join([self.batch_no, self.trans_param.read_type]))
-                if batch_data is not None:
-                    update_trans_status_success(self.batch_no, self.trans_param.read_type,
-                                                len(read_excel_files(self.pathsAndTable.get_save_path())),
-                                                time_granularity,
-                                                batch_data['min_date'], batch_data['max_date'],
-                                                batch_data['total_count'], self.save_db)
-                else:
-                    update_trans_status_success(self.batch_no, self.trans_param.read_type,
-                                                len(read_excel_files(self.pathsAndTable.get_save_path())),
-                                                time_granularity,
-                                                None, None,
-                                                None, self.save_db)
-        trans_print("结束执行", self.trans_param.read_type, ",总耗时:",
-                    str(datetime.datetime.now() - begin))

+ 116 - 0
etl/common/BaseDataTrans.py

@@ -0,0 +1,116 @@
+import datetime
+import traceback
+
+from etl.common.ClearData import ClearData
+from etl.common.PathsAndTable import PathsAndTable
+from etl.common.SaveToDb import SaveToDb
+from etl.common.UnzipAndRemove import UnzipAndRemove
+from service.plt_service import get_all_wind, update_trans_status_success, update_trans_status_error, \
+    update_trans_status_running
+from utils.file.trans_methods import read_excel_files
+from utils.log.trans_log import trans_print, set_trance_id
+
+
+class BaseDataTrans(object):
+    def __init__(self, data: dict = None, save_db=True, step=0, end=4):
+
+        self.batch_no = data['batch_code']
+        self.batch_name = data['batch_name']
+        self.read_type = data['transfer_type']
+        self.read_path = data['transfer_addr']
+        self.field_code = data['field_code']
+        self.field_name = data['field_name']
+        self.save_zip = False
+        self.step = step
+        self.end = end
+        self.wind_col_trans, self.rated_power_and_cutout_speed_map = get_all_wind(self.field_code)
+        self.batch_count = 100000
+        self.save_db = save_db
+        self.filed_conf = self.get_filed_conf()
+        self.pathsAndTable = PathsAndTable(self.batch_no, self.batch_name, self.read_path, self.field_name,
+                                           self.read_type, save_db, self.save_zip)
+
+    def get_filed_conf(self):
+        raise NotImplementedError("需要实现 获取点检表 方法")
+
+    # 第一步 清理数据
+    def clean_file_and_db(self):
+        clean_data = ClearData(self.pathsAndTable)
+        clean_data.run()
+
+    # 第二步 解压 移动到临时文件
+    def unzip_or_remove_to_tmp_dir(self):
+        # 解压并删除
+        unzip_and_remove = UnzipAndRemove(self.pathsAndTable)
+        unzip_and_remove.run()
+
+    # 第三步 读取 并 保存到临时文件
+    def read_and_save_tmp_file(self):
+        raise NotImplementedError("第三步未做实现")
+
+    # 第四步 统计 并 保存到正式文件
+    def statistics_and_save_to_file(self):
+        raise NotImplementedError("第四步未做实现")
+
+    # 第五步 保存到数据库
+    def save_to_db(self):
+        save_to_db = SaveToDb(self.pathsAndTable, self.batch_count)
+        save_to_db.run()
+
+    # 最后更新执行程度
+    def update_exec_progress(self):
+        update_trans_status_success(self.batch_no, self.read_type,
+                                    len(read_excel_files(self.pathsAndTable.get_save_path())),
+                                    None, None, None, None, self.save_db)
+
+    def run(self):
+        total_begin = datetime.datetime.now()
+        try:
+            trance_id = '-'.join([self.batch_no, self.field_name, self.read_type])
+            set_trance_id(trance_id)
+            update_trans_status_running(self.batch_no, self.read_type, self.save_db)
+
+            if self.step <= 0 and self.end >= 0:
+                begin = datetime.datetime.now()
+                trans_print("开始清理数据,临时文件夹:", self.pathsAndTable.get_tmp_path())
+                self.clean_file_and_db()
+                trans_print("清理数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
+
+            if self.step <= 1 and self.end >= 1:
+                begin = datetime.datetime.now()
+                trans_print("开始解压移动文件")
+                self.unzip_or_remove_to_tmp_dir()
+                trans_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
+
+            if self.step <= 2 and self.end >= 2:
+                begin = datetime.datetime.now()
+                trans_print("开始保存数据到临时文件")
+                self.read_and_save_tmp_file()
+                trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
+
+            if self.step <= 3 and self.end >= 3:
+                begin = datetime.datetime.now()
+                trans_print("开始保存数据到正式文件")
+                self.statistics_and_save_to_file()
+                trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
+
+            if self.step <= 4 and self.end >= 4:
+                begin = datetime.datetime.now()
+                trans_print("开始保存到数据库,是否存库:", self.pathsAndTable.save_db)
+                self.save_to_db()
+                trans_print("保存到数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
+
+            self.update_exec_progress()
+        except Exception as e:
+            trans_print(traceback.format_exc())
+            update_trans_status_error(self.batch_no, self.read_type, str(e), self.save_db)
+            raise e
+        finally:
+            self.pathsAndTable.delete_tmp_files()
+            trans_print("执行结束,总耗时:", str(datetime.datetime.now() - total_begin))
+
+
+if __name__ == '__main__':
+    test = BaseDataTrans(save_db=False, batch_no="WOF053600062-WOB000010", read_type="fault")
+
+    test.run()

+ 4 - 3
etl/step/ClearData.py → etl/common/ClearData.py

@@ -1,6 +1,6 @@
 import datetime
 
-from etl.base.PathsAndTable import PathsAndTable
+from etl.common.PathsAndTable import PathsAndTable
 from service.plt_service import update_trans_transfer_progress
 from utils.log.trans_log import trans_print
 
@@ -12,11 +12,12 @@ class ClearData(object):
 
     def clean_data(self):
         self.pathsAndTable.delete_tmp_files()
-        self.pathsAndTable.delete_batch_db()
+        if self.pathsAndTable.save_db:
+            self.pathsAndTable.delete_batch_db()
         self.pathsAndTable.delete_batch_files()
 
     def run(self):
-        trans_print("开始清理数据")
+        trans_print("开始清理数据,临时文件夹:", self.pathsAndTable.get_tmp_path())
         begin = datetime.datetime.now()
         self.clean_data()
         update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 5,

+ 22 - 15
etl/base/PathsAndTable.py → etl/common/PathsAndTable.py

@@ -1,7 +1,7 @@
 import os
 import shutil
 
-from service.trans_service import drop_table, creat_table_and_add_partition
+from service.trans_service import drop_table, creat_min_sec_table, create_warn_fault_table
 from utils.conf.read_conf import *
 from utils.log.trans_log import trans_print
 
@@ -17,12 +17,12 @@ class PathsAndTable(object):
         self.save_db = save_db
         self.save_zip = save_zip
         self.multi_pool_count = 6
+        self.is_delete_db = False
 
-        yaml_config = yaml_conf(os.environ.get('ETL_CONF', "/data/config/etl_config.yaml"))
-
-        self.tmp_base_path = read_conf(yaml_config, "tmp_base_path", "/tmp")
+        yaml_config = yaml_conf(os.environ.get('ETL_CONF'))
 
         save_path_conf = read_conf(yaml_config, "save_path")
+        self.tmp_base_path = read_conf(yaml_config, "tmp_base_path", "/tmp")
         if save_path_conf:
             self.save_path = save_path_conf + os.sep + self.field_name
         else:
@@ -37,21 +37,21 @@ class PathsAndTable(object):
     def get_save_path(self):
         return os.path.join(self.save_path, self.batch_no + "_" + self.batch_name, self.read_type)
 
-    def get_save_tmp_path(self):
+    def get_tmp_path(self):
         return os.path.join(self.tmp_base_path, self.field_name, self.batch_no + "_" + self.batch_name,
                             self.read_type)
 
     def get_excel_tmp_path(self):
-        return os.path.join(self.get_save_tmp_path(), 'excel_tmp' + os.sep)
+        return os.path.join(self.get_tmp_path(), 'excel_tmp' + os.sep)
 
     def get_read_tmp_path(self):
-        return os.path.join(self.get_save_tmp_path(), 'read_tmp')
+        return os.path.join(self.get_tmp_path(), 'read_tmp')
 
     def get_merge_tmp_path(self, wind_turbine_number=None):
         if wind_turbine_number is None:
-            return os.path.join(self.get_save_tmp_path(), 'merge_tmp')
+            return os.path.join(self.get_tmp_path(), 'merge_tmp')
         else:
-            return os.path.join(self.get_save_tmp_path(), 'merge_tmp', str(wind_turbine_number))
+            return os.path.join(self.get_tmp_path(), 'merge_tmp', str(wind_turbine_number))
 
     def get_table_name(self):
         return "_".join([self.batch_no, self.read_type])
@@ -64,19 +64,26 @@ class PathsAndTable(object):
 
     def delete_tmp_files(self):
         trans_print("开始删除临时文件夹")
-        if os.path.exists(self.get_save_tmp_path()):
-            shutil.rmtree(self.get_save_tmp_path())
+        if os.path.exists(self.get_tmp_path()):
+            shutil.rmtree(self.get_tmp_path())
         trans_print("删除临时文件夹删除成功")
 
     def delete_batch_db(self):
         if self.save_db:
             trans_print("开始删除表")
-            table_name = self.get_table_name()
-            drop_table(table_name, self.save_db)
+            if not self.is_delete_db:
+                table_name = self.get_table_name()
+                drop_table(table_name, self.save_db)
+                self.is_delete_db = True
             trans_print("删除表结束")
 
-    def create_batch_db(self, wind_names=list()):
+    def create_batch_db(self, wind_names: list = list()):
         if self.save_db:
             trans_print("开始创建表")
-            creat_table_and_add_partition(self.get_table_name(), wind_names, self.read_type)
+            if self.read_type in ['second', 'minute']:
+                creat_min_sec_table(self.get_table_name(), wind_names, self.read_type)
+            elif self.read_type in ['fault', 'warn']:
+                create_warn_fault_table(self.get_table_name())
+            else:
+                raise Exception("不支持的读取类型:" + self.read_type)
             trans_print("建表结束")

+ 12 - 19
etl/step/SaveToDb.py → etl/common/SaveToDb.py

@@ -1,9 +1,7 @@
-import datetime
 import multiprocessing
-import os
 import traceback
-
-from etl.base.PathsAndTable import PathsAndTable
+from os import path
+from etl.common.PathsAndTable import PathsAndTable
 from service.plt_service import update_trans_transfer_progress
 from service.trans_service import save_file_to_db
 from utils.file.trans_methods import read_excel_files, split_array
@@ -13,19 +11,16 @@ from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
 
 class SaveToDb(object):
 
-    def __init__(self, pathsAndTable: PathsAndTable):
+    def __init__(self, pathsAndTable: PathsAndTable, batch_count=100000):
         self.pathsAndTable = pathsAndTable
+        self.batch_count = batch_count
 
     def mutiprocessing_to_save_db(self):
         # 开始保存到SQL文件
 
         self.pathsAndTable.delete_batch_db()
-        trans_print("开始保存到数据库文件")
         all_saved_files = read_excel_files(self.pathsAndTable.get_save_path())
-        wind_names = [str(os.path.basename(i)).replace(".csv", "") for i in all_saved_files]
-
-        # creat_table_and_add_partition(self.pathsAndTable.get_table_name(), wind_names,
-        #                               self.pathsAndTable.read_type)
+        wind_names = [str(path.basename(i)).replace(".csv", "") for i in all_saved_files]
 
         self.pathsAndTable.create_batch_db(wind_names)
 
@@ -35,8 +30,9 @@ class SaveToDb(object):
         try:
             for index, arr in enumerate(all_arrays):
                 with multiprocessing.Pool(split_count) as pool:
-                    pool.starmap(save_file_to_db, [(self.pathsAndTable.get_table_name(), file,) for file in
-                                                   all_saved_files])
+                    pool.starmap(save_file_to_db,
+                                 [(self.pathsAndTable.get_table_name(), file, self.batch_count) for file in
+                                  all_saved_files])
                 update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
                                                round(70 + 29 * (index + 1) / len(all_arrays), 2),
                                                self.pathsAndTable.save_db)
@@ -44,12 +40,9 @@ class SaveToDb(object):
             trans_print(traceback.format_exc())
             message = "保存到数据库错误,系统返回错误:" + str(e)
             raise ValueError(message)
-        trans_print("结束保存到数据库文件")
 
     def run(self):
-        trans_print("开始保存到数据库")
-        begin = datetime.datetime.now()
-        self.mutiprocessing_to_save_db()
-        update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 99,
-                                       self.pathsAndTable.save_db)
-        trans_print("保存到数据结束,耗时:", datetime.datetime.now() - begin)
+        if self.pathsAndTable.save_db:
+            self.mutiprocessing_to_save_db()
+            update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 99,
+                                           self.pathsAndTable.save_db)

+ 1 - 6
etl/step/UnzipAndRemove.py → etl/common/UnzipAndRemove.py

@@ -2,9 +2,7 @@ import multiprocessing
 import os
 import traceback
 
-import datetime
-
-from etl.base.PathsAndTable import PathsAndTable
+from etl.common.PathsAndTable import PathsAndTable
 from service.plt_service import update_trans_transfer_progress
 from utils.file.trans_methods import read_files, read_excel_files, copy_to_new, split_array
 from utils.log.trans_log import trans_print
@@ -68,9 +66,6 @@ class UnzipAndRemove(object):
         return all_files
 
     def run(self):
-        trans_print("开始解压移动文件")
-        begin = datetime.datetime.now()
         self.remove_file_to_tmp_path()
         update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 20,
                                        self.pathsAndTable.save_db)
-        trans_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin)

+ 0 - 0
etl/base/__init__.py → etl/common/__init__.py


+ 0 - 0
etl/step/__init__.py → etl/wind_power/__init__.py


+ 124 - 0
etl/wind_power/fault_warn/FaultWarnTrans.py

@@ -0,0 +1,124 @@
+import os
+
+import numpy as np
+import pandas as pd
+
+from etl.common.BaseDataTrans import BaseDataTrans
+from service.plt_service import update_trans_status_error
+from service.trans_service import get_fault_warn_conf
+from utils.conf.read_conf import read_conf
+from utils.file.trans_methods import read_excel_files, read_file_to_df, create_file_path, valid_eval
+from utils.log.trans_log import trans_print
+
+
+class FaultWarnTrans(BaseDataTrans):
+
+    def __init__(self, data: dict = None, save_db=True, step=0, end=4):
+        super(FaultWarnTrans, self).__init__(data, save_db, step, end)
+
+    def get_filed_conf(self):
+        return get_fault_warn_conf(self.field_code, self.read_type)
+
+    # 第三步 读取 并 保存到临时文件
+    def read_and_save_tmp_file(self):
+        trans_print("无需保存临时文件")
+
+    # 第四步 统计 并 保存到正式文件
+    def statistics_and_save_to_file(self):
+        conf_map = self.get_filed_conf()
+        if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
+            message = f"未找到{self.batch_no}的{self.read_type}配置"
+            trans_print(message)
+            update_trans_status_error(self.batch_no, self.read_type, message, self.save_db)
+        else:
+
+            for key, v in conf_map.items():
+                if v and type(v) == str:
+                    v = v.replace("\r\n", "").replace("\n", "")
+                    conf_map[key] = v
+
+            read_fields_keys = [i for i in conf_map.keys() if i.startswith('field_')]
+            # 需要执行 exec的字段
+            # exec_fields = [(k.replace("exec_", ""), v) for k, v in conf_map.items() if k.startswith('exec_')]
+            # 读取需要执行 筛选的字段
+            select_fields = [(k.replace("select_", ""), v) for k, v in conf_map.items() if
+                             k.startswith('select_') and v]
+            time_format = read_conf(conf_map, 'time_format')
+
+            trans_map = dict()
+            trans_cols = []
+            for key in read_fields_keys:
+                field_value = read_conf(conf_map, key)
+                if field_value:
+                    vas = str(field_value).split('|')
+                    trans_cols.extend(vas)
+                    field_key = key.replace("field_", "")
+                    for v in vas:
+                        trans_map[v] = field_key
+
+            all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
+
+            df = pd.DataFrame()
+            for file in all_files:
+                now_df = read_file_to_df(file, trans_cols=trans_cols)
+                df = pd.concat([df, now_df], ignore_index=True)
+
+            df.rename(columns=trans_map, inplace=True)
+
+            for col in df.columns:
+                if 'field_' + col not in read_fields_keys:
+                    del df[col]
+
+            if time_format:
+                if valid_eval(time_format):
+                    eval_str = f"df['begin_time'].apply(lambda error_time: {time_format} )"
+                    df['begin_time'] = eval(eval_str)
+                    eval_str = f"df['end_time'].apply(lambda error_time: {time_format} )"
+                    df['end_time'] = eval(eval_str)
+
+            df['begin_time'] = pd.to_datetime(df['begin_time'], errors='coerce')
+            df['end_time'] = pd.to_datetime(df['end_time'], errors='coerce')
+
+            exec_wind_turbine_number = read_conf(conf_map, 'exec_wind_turbine_number')
+            if exec_wind_turbine_number:
+                if valid_eval(exec_wind_turbine_number):
+                    exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {exec_wind_turbine_number} )"
+                    df['wind_turbine_number'] = eval(exec_str)
+
+            for field, select_str in select_fields:
+                use_in = True
+                if str(select_str).strip().startswith("!"):
+                    use_in = False
+                    select_str = select_str[1:]
+
+                select_str = select_str.replace("'", "").replace("[", "").replace("]", "")
+                values = select_str.split(',')
+
+                if df[field].dtype == int:
+                    values = [int(i) for i in values]
+
+                if use_in:
+                    df = df[df[field].isin(values)]
+                else:
+                    df = df[~df[field].isin(values)]
+
+            df['wind_turbine_name'] = df['wind_turbine_number']
+            df['wind_turbine_number'] = df['wind_turbine_number'].map(
+                self.wind_col_trans).fillna(df['wind_turbine_number'])
+
+            df['time_diff'] = (df['end_time'] - df['begin_time']).dt.total_seconds()
+            df.loc[df['time_diff'] < 0, 'time_diff'] = np.nan
+
+            # 根绝开始时间进行排序
+            df.sort_values(by=['wind_turbine_number', 'begin_time', 'end_time'], inplace=True)
+
+            if self.save_zip:
+                save_path = os.path.join(self.pathsAndTable.get_save_path(), str(self.batch_name) + '.csv.gz')
+            else:
+                save_path = os.path.join(self.pathsAndTable.get_save_path(), str(self.batch_name) + '.csv')
+
+            create_file_path(save_path, is_file_path=True)
+            if self.save_zip:
+                df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')
+            else:
+                df.to_csv(save_path, index=False, encoding='utf-8', date_format='%Y-%m-%d %H:%M:%S')

+ 0 - 0
tmp_file/__init__.py → etl/wind_power/fault_warn/__init__.py


+ 0 - 2
etl/step/ClassIdentifier.py → etl/wind_power/min_sec/ClassIdentifier.py

@@ -349,9 +349,7 @@ class ClassIdentifier(object):
 
     def run(self):
         # Implement your class identification logic here
-        print_memory_usage(self.wind_turbine_number + "开始打标签")
         begin = datetime.datetime.now()
         df = self.identifier()
         trans_print("打标签结束,", df.shape, ",耗时:", datetime.datetime.now() - begin)
-        print_memory_usage(self.wind_turbine_number + "打标签结束,")
         return df

+ 119 - 0
etl/wind_power/min_sec/MinSecTrans.py

@@ -0,0 +1,119 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/5/15
+# @Author  : 魏志亮
+import multiprocessing
+
+from etl.common.BaseDataTrans import BaseDataTrans
+from etl.wind_power.min_sec.TransParam import TransParam
+from etl.wind_power.min_sec.ReadAndSaveTmp import ReadAndSaveTmp
+from etl.wind_power.min_sec.StatisticsAndSaveFile import StatisticsAndSaveFile
+from service.plt_service import update_trans_status_success, update_trans_status_error
+from service.trans_service import batch_statistics, get_min_sec_conf
+from utils.conf.read_conf import read_conf
+from utils.df_utils.util import get_time_space
+from utils.file.trans_methods import *
+
+
+class MinSecTrans(BaseDataTrans):
+
+    def __init__(self, data: dict = None, save_db=True, step=0, end=4):
+        super(MinSecTrans, self).__init__(data, save_db, step, end)
+        self.statistics_map = multiprocessing.Manager().dict()
+        self.trans_param = self.get_trans_param()
+        self.trans_param.wind_col_trans = self.wind_col_trans
+
+    def get_filed_conf(self):
+        return get_min_sec_conf(self.field_code, self.read_type)
+
+    def get_trans_param(self):
+        conf_map = self.get_filed_conf()
+        if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
+            message = f"未找到{self.batch_no}的{self.read_type}配置"
+            trans_print(message)
+            update_trans_status_error(self.batch_no, self.read_type, message, self.save_db)
+        else:
+            resolve_col_prefix = read_conf(conf_map, 'resolve_col_prefix')
+            wind_name_exec = read_conf(conf_map, 'wind_name_exec', None)
+            is_vertical_table = read_conf(conf_map, 'is_vertical_table', False)
+            merge_columns = read_conf(conf_map, 'merge_columns', False)
+
+            vertical_cols = read_conf(conf_map, 'vertical_read_cols', '').split(',')
+            index_cols = read_conf(conf_map, 'vertical_index_cols', '').split(',')
+            vertical_key = read_conf(conf_map, 'vertical_col_key')
+            vertical_value = read_conf(conf_map, 'vertical_col_value')
+            need_valid_cols = not merge_columns
+
+            cols_trans_all = dict()
+            trans_cols = ['wind_turbine_number', 'time_stamp', 'active_power', 'rotor_speed', 'generator_speed',
+                          'wind_velocity', 'pitch_angle_blade_1', 'pitch_angle_blade_2', 'pitch_angle_blade_3',
+                          'cabin_position', 'true_wind_direction', 'yaw_error1', 'set_value_of_active_power',
+                          'gearbox_oil_temperature', 'generatordrive_end_bearing_temperature',
+                          'generatornon_drive_end_bearing_temperature', 'wind_turbine_status',
+                          'wind_turbine_status2',
+                          'cabin_temperature', 'twisted_cable_angle', 'front_back_vibration_of_the_cabin',
+                          'side_to_side_vibration_of_the_cabin', 'actual_torque', 'given_torque',
+                          'clockwise_yaw_count',
+                          'counterclockwise_yaw_count', 'unusable', 'power_curve_available',
+                          'required_gearbox_speed',
+                          'inverter_speed_master_control', 'outside_cabin_temperature', 'main_bearing_temperature',
+                          'gearbox_high_speed_shaft_bearing_temperature',
+                          'gearboxmedium_speed_shaftbearing_temperature',
+                          'gearbox_low_speed_shaft_bearing_temperature', 'generator_winding1_temperature',
+                          'generator_winding2_temperature', 'generator_winding3_temperature',
+                          'turbulence_intensity', 'param1',
+                          'param2', 'param3', 'param4', 'param5', 'param6', 'param7', 'param8', 'param9', 'param10']
+
+            for col in trans_cols:
+                cols_trans_all[col] = read_conf(conf_map, col, '')
+
+            return TransParam(read_type=self.read_type, read_path=self.read_path,
+                              cols_tran=cols_trans_all,
+                              wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
+                              vertical_cols=vertical_cols, vertical_key=vertical_key,
+                              vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
+                              resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols)
+
+    # 第三步 读取 并 保存到临时文件
+    def read_and_save_tmp_file(self):
+        read_and_save_tmp = ReadAndSaveTmp(self.pathsAndTable, self.trans_param)
+        read_and_save_tmp.run()
+
+    # 第四步 统计 并 保存到正式文件
+    def statistics_and_save_to_file(self):
+        # 保存到正式文件
+        statistics_and_save_file = StatisticsAndSaveFile(self.pathsAndTable, self.trans_param, self.statistics_map,
+                                                         self.rated_power_and_cutout_speed_map)
+        statistics_and_save_file.run()
+
+    # 最后更新执行程度
+    def update_exec_progress(self):
+        if self.end >= 4:
+            all_files = read_excel_files(self.pathsAndTable.get_save_path())
+            if self.step <= 3:
+                update_trans_status_success(self.batch_no, self.trans_param.read_type,
+                                            len(all_files),
+                                            self.statistics_map['time_granularity'],
+                                            self.statistics_map['min_date'], self.statistics_map['max_date'],
+                                            self.statistics_map['total_count'], self.save_db)
+            else:
+                df = read_file_to_df(all_files[0], read_cols=['time_stamp'])
+                df['time_stamp'] = pd.to_datetime(df['time_stamp'])
+                time_granularity = get_time_space(df, 'time_stamp')
+                batch_data = batch_statistics("_".join([self.batch_no, self.trans_param.read_type]))
+                if batch_data is not None:
+                    update_trans_status_success(self.batch_no, self.trans_param.read_type,
+                                                len(read_excel_files(self.pathsAndTable.get_save_path())),
+                                                time_granularity,
+                                                batch_data['min_date'], batch_data['max_date'],
+                                                batch_data['total_count'], self.save_db)
+                else:
+                    update_trans_status_success(self.batch_no, self.trans_param.read_type,
+                                                len(read_excel_files(self.pathsAndTable.get_save_path())),
+                                                time_granularity,
+                                                None, None,
+                                                None, self.save_db)
+
+
+if __name__ == '__main__':
+    test = MinSecTrans(batch_no="WOF053600062-WOB000009", read_type="minute", save_db=False)
+    test.run()

+ 17 - 22
etl/step/ReadAndSaveTmp.py → etl/wind_power/min_sec/ReadAndSaveTmp.py

@@ -5,11 +5,11 @@ import traceback
 
 import pandas as pd
 
-from etl.base import TransParam
-from etl.base.PathsAndTable import PathsAndTable
+from etl.wind_power.min_sec import TransParam
+from etl.common.PathsAndTable import PathsAndTable
 from service.plt_service import update_trans_transfer_progress
 from utils.file.trans_methods import read_excel_files, split_array, del_blank, \
-    create_file_path, read_file_to_df
+    create_file_path, read_file_to_df, valid_eval
 from utils.log.trans_log import trans_print
 from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, get_dir_size, max_file_size_get_max_cpu_count
 
@@ -45,8 +45,9 @@ class ReadAndSaveTmp(object):
         df = self.read_excel_to_df(file_path)
 
         if self.trans_param.wind_name_exec:
-            exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
-            df['wind_turbine_number'] = eval(exec_str)
+            if valid_eval(self.trans_param.wind_name_exec):
+                exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
+                df['wind_turbine_number'] = eval(exec_str)
 
         names = set(df['wind_turbine_number'].values)
         cols = list(df.columns)
@@ -91,7 +92,6 @@ class ReadAndSaveTmp(object):
                             else:
                                 same_col[value] = [k]
 
-                trans_print("包含转换字段,开始处理转换字段")
                 df.rename(columns=real_cols_trans, inplace=True)
 
                 # 添加使用同一个excel字段的值
@@ -107,11 +107,9 @@ class ReadAndSaveTmp(object):
         df = del_blank(df, ['wind_turbine_number'])
         df = df[df['time_stamp'].isna() == False]
         if self.trans_param.wind_name_exec and not self.trans_param.merge_columns:
-            exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
-            df['wind_turbine_number'] = eval(exec_str)
-
-        # 删除 有功功率 和 风速均为空的情况
-        df.dropna(subset=['active_power', 'wind_velocity'], how='all', inplace=True)
+            if valid_eval(self.trans_param.wind_name_exec):
+                exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
+                df['wind_turbine_number'] = eval(exec_str)
 
         self.save_to_tmp_csv(df)
 
@@ -206,8 +204,7 @@ class ReadAndSaveTmp(object):
 
         if self.trans_param.is_vertical_table:
             vertical_cols = self.trans_param.vertical_cols
-            df = read_file_to_df(file_path, vertical_cols, header=self.trans_param.header,
-                                 trans_cols=self.trans_param.vertical_cols)
+            df = read_file_to_df(file_path, vertical_cols, trans_cols=self.trans_param.vertical_cols)
             df = df[df[self.trans_param.vertical_key].isin(read_cols)]
             df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number',
                                self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True)
@@ -230,22 +227,20 @@ class ReadAndSaveTmp(object):
                     trans_cols.append(v)
             trans_cols = list(set(trans_cols))
             if self.trans_param.merge_columns:
-                df = read_file_to_df(file_path, header=self.trans_param.header,
-                                     trans_cols=trans_cols)
+                df = read_file_to_df(file_path, trans_cols=trans_cols, not_find_header='ignore')
             else:
                 if self.trans_param.need_valid_cols:
-                    df = read_file_to_df(file_path, read_cols, header=self.trans_param.header,
-                                         trans_cols=trans_cols)
+                    df = read_file_to_df(file_path, read_cols, trans_cols=trans_cols)
                 else:
-                    df = read_file_to_df(file_path, header=self.trans_param.header,
-                                         trans_cols=trans_cols)
+                    df = read_file_to_df(file_path, trans_cols=trans_cols)
 
             # 处理列名前缀问题
             if self.trans_param.resolve_col_prefix:
                 columns_dict = dict()
-                for column in df.columns:
-                    columns_dict[column] = eval(self.trans_param.resolve_col_prefix)
-                df.rename(columns=columns_dict, inplace=True)
+                if valid_eval(self.trans_param.resolve_col_prefix):
+                    for column in df.columns:
+                        columns_dict[column] = eval(self.trans_param.resolve_col_prefix)
+                    df.rename(columns=columns_dict, inplace=True)
 
             if self.trans_param.merge_columns:
                 select_cols = [self.trans_param.cols_tran['wind_turbine_number'],

+ 12 - 48
etl/step/StatisticsAndSaveFile.py → etl/wind_power/min_sec/StatisticsAndSaveFile.py

@@ -6,15 +6,15 @@ import traceback
 import pandas as pd
 import numpy as np
 
-from etl.base import TransParam
-from etl.base.PathsAndTable import PathsAndTable
-from etl.step.ClassIdentifier import ClassIdentifier
+from etl.wind_power.min_sec import TransParam
+from etl.common.PathsAndTable import PathsAndTable
+from etl.wind_power.min_sec.ClassIdentifier import ClassIdentifier
 from service.plt_service import update_trans_transfer_progress
 from utils.conf.read_conf import read_conf
 from utils.df_utils.util import get_time_space
 from utils.file.trans_methods import create_file_path, read_excel_files, read_file_to_df, split_array
 from utils.log.trans_log import trans_print
-from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, print_memory_usage
+from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
 
 
 class StatisticsAndSaveFile(object):
@@ -55,21 +55,7 @@ class StatisticsAndSaveFile(object):
                 if 'time_granularity' not in self.statistics_map.keys():
                     self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp')
 
-    def save_statistics_file(self):
-        save_path = os.path.join(os.path.dirname(self.paths_and_table.get_save_path()),
-                                 self.paths_and_table.read_type + '_statistics.txt')
-        create_file_path(save_path, is_file_path=True)
-        with open(save_path, 'w', encoding='utf8') as f:
-            f.write("总数据量:" + str(self.statistics_map['total_count']) + "\n")
-            f.write("最小时间:" + str(self.statistics_map['min_date']) + "\n")
-            f.write("最大时间:" + str(self.statistics_map['max_date']) + "\n")
-            f.write("风机数量:" + str(len(read_excel_files(self.paths_and_table.get_read_tmp_path()))) + "\n")
-
-    def check_data_validity(self, df):
-        pass
-
     def save_to_csv(self, filename):
-        print_memory_usage("开始读取csv:" + os.path.basename(filename))
         df = read_file_to_df(filename)
         if self.trans_param.is_vertical_table:
             df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
@@ -78,23 +64,20 @@ class StatisticsAndSaveFile(object):
             # 重置索引以得到普通的列
             df.reset_index(inplace=True)
 
-        print_memory_usage("结束读取csv,:" + os.path.basename(filename))
-
         # 转化风机名称
-        trans_print("开始转化风机名称")
         origin_wind_name = str(df['wind_turbine_number'].values[0])
         df['wind_turbine_number'] = df['wind_turbine_number'].astype('str')
         # df['wind_turbine_name'] = df['wind_turbine_number']
         df['wind_turbine_number'] = df['wind_turbine_number'].map(
             self.trans_param.wind_col_trans).fillna(df['wind_turbine_number'])
         wind_col_name = str(df['wind_turbine_number'].values[0])
-        print_memory_usage("转化风机名称结束:" + wind_col_name)
 
         not_double_cols = ['wind_turbine_number', 'wind_turbine_name', 'time_stamp', 'param6', 'param7', 'param8',
                            'param9', 'param10']
 
-        solve_time_begin = datetime.datetime.now()
-        trans_print(wind_col_name, "去掉非法数据前大小:", df.shape[0])
+        # 删除 有功功率 和 风速均为空的情况
+        df.dropna(subset=['active_power', 'wind_velocity'], how='all', inplace=True)
+
         df.replace(np.nan, -999999999, inplace=True)
         number_cols = df.select_dtypes(include=['number']).columns.tolist()
         for col in df.columns:
@@ -103,49 +86,37 @@ class StatisticsAndSaveFile(object):
                     df[col] = pd.to_numeric(df[col], errors='coerce')
                     # 删除包含NaN的行(即那些列A转换失败的行)
                     df = df.dropna(subset=[col])
-        trans_print(wind_col_name, "去掉非法数据后大小:", df.shape[0])
         df.replace(-999999999, np.nan, inplace=True)
-        print_memory_usage("处理非法数据大小结束:" + wind_col_name)
 
-        trans_print(wind_col_name, "去掉重复数据前大小:", df.shape[0])
         df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
-        trans_print(wind_col_name, "去掉重复数据后大小:", df.shape[0])
-        trans_print("处理非法重复数据结束,耗时:", datetime.datetime.now() - solve_time_begin)
-        print_memory_usage("处理重复数据结束:" + wind_col_name)
 
         # 添加年月日
         solve_time_begin = datetime.datetime.now()
-        trans_print(wind_col_name, "包含时间字段,开始处理时间字段,添加年月日", filename)
-        trans_print(wind_col_name, "时间原始大小:", df.shape[0])
         # df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
         # trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
-        df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
+        df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce", format='%d-%m-%Y %H:%M:%S')
         df.dropna(subset=['time_stamp'], inplace=True)
-        trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
         df.sort_values(by='time_stamp', inplace=True)
-        trans_print("处理时间字段结束,耗时:", datetime.datetime.now() - solve_time_begin)
-        print_memory_usage("处理时间结果:" + wind_col_name)
-
         df = df[[i for i in self.trans_param.cols_tran.keys() if i in df.columns]]
-        print_memory_usage("删减无用字段后内存占用:" + wind_col_name)
+
+        df['active_power'] = df['active_power'] / 1000
+        ## 做数据检测前,羡强行处理有功功率
+        df = df[df['active_power'] < 5000]
 
         rated_power_and_cutout_speed_tuple = read_conf(self.rated_power_and_cutout_speed_map, str(wind_col_name))
         if rated_power_and_cutout_speed_tuple is None:
             rated_power_and_cutout_speed_tuple = (None, None)
 
-        print_memory_usage("打标签前内存占用:" + wind_col_name)
         class_identifiler = ClassIdentifier(wind_turbine_number=wind_col_name, origin_df=df,
                                             rated_power=rated_power_and_cutout_speed_tuple[0],
                                             cut_out_speed=rated_power_and_cutout_speed_tuple[1])
         df = class_identifiler.run()
-        print_memory_usage("打标签后内存占用:" + wind_col_name)
 
         df['year'] = df['time_stamp'].dt.year
         df['month'] = df['time_stamp'].dt.month
         df['day'] = df['time_stamp'].dt.day
         df['time_stamp'] = df['time_stamp'].apply(
             lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
-        print_memory_usage("添加年月日后:" + wind_col_name)
 
         df['wind_turbine_name'] = str(origin_wind_name)
 
@@ -165,9 +136,7 @@ class StatisticsAndSaveFile(object):
         trans_print("保存" + str(wind_col_name) + "成功")
 
     def mutiprocessing_to_save_file(self):
-        print_memory_usage("开始执行,占用内存")
         # 开始保存到正式文件
-        trans_print("开始保存到excel文件")
         all_tmp_files = read_excel_files(self.paths_and_table.get_read_tmp_path())
         # split_count = self.pathsAndTable.multi_pool_count
         split_count = use_files_get_max_cpu_count(all_tmp_files)
@@ -186,12 +155,7 @@ class StatisticsAndSaveFile(object):
             message = "保存文件错误,系统返回错误:" + str(e)
             raise ValueError(message)
 
-        trans_print("结束保存到excel文件")
-
     def run(self):
-        trans_print("开始保存数据到正式文件")
-        begin = datetime.datetime.now()
         self.mutiprocessing_to_save_file()
         update_trans_transfer_progress(self.paths_and_table.batch_no, self.paths_and_table.read_type, 70,
                                        self.paths_and_table.save_db)
-        trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin)

+ 1 - 2
etl/base/TransParam.py → etl/wind_power/min_sec/TransParam.py

@@ -8,7 +8,7 @@ class TransParam(object):
     def __init__(self, read_type=None, read_path=None, cols_tran=dict(),
                  wind_name_exec=str(), is_vertical_table=False, vertical_cols=list(), vertical_key=None,
                  vertical_value=None, index_cols=list(), merge_columns=False, resolve_col_prefix=None,
-                 need_valid_cols=True, header=0, wind_col_trans: dict = None):
+                 need_valid_cols=True, wind_col_trans: dict = None):
         self.read_type = read_type
         self.read_path = read_path
         self.cols_tran = cols_tran
@@ -21,5 +21,4 @@ class TransParam(object):
         self.merge_columns = merge_columns
         self.resolve_col_prefix = resolve_col_prefix
         self.need_valid_cols = need_valid_cols
-        self.header = header
         self.wind_col_trans = wind_col_trans

+ 0 - 0
etl/wind_power/min_sec/__init__.py


+ 2 - 0
ge_requirement.sh

@@ -0,0 +1,2 @@
+#!/bin/bash
+pip freeze > requirements.txt

+ 2 - 1
package.sh

@@ -1,2 +1,3 @@
-pyinstaller -F -n etl_tool app_run.py
+pyinstaller --clean -F -n etl_tool app_run.py
+
 #python -m nuitka --onefile --remove-output app_run.py

+ 19 - 1
requirements.txt

@@ -1,10 +1,28 @@
 chardet==5.2.0
+contourpy==1.3.0
+cycler==0.12.1
 DBUtils==3.1.0
+et-xmlfile==1.1.0
+fonttools==4.53.1
+greenlet==3.0.3
+importlib_resources==6.4.5
+kiwisolver==1.4.7
+matplotlib==3.9.2
 numpy==2.0.0
+openpyxl==3.1.5
+packaging==24.1
 pandas==2.2.2
+pillow==10.4.0
 psutil==6.0.0
 PyMySQL==1.1.0
-PyYAML==6.0.1
+pyparsing==3.1.4
+python-calamine==0.2.3
+python-dateutil==2.9.0.post0
+pytz==2024.1
 PyYAML==6.0.1
 rarfile==4.2
+six==1.16.0
 SQLAlchemy==2.0.30
+typing_extensions==4.12.2
+tzdata==2024.1
+zipp==3.20.1

+ 33 - 11
service/plt_service.py

@@ -26,7 +26,9 @@ def update_timeout_trans_data():
 def update_trans_status_running(batch_no, trans_type, schedule_exec=True):
     if schedule_exec:
         exec_sql = """
-        update data_transfer set transfer_state = 0,trans_sys_status = 0 ,transfer_start_time = now()
+        update data_transfer set transfer_state = 0,trans_sys_status = 0 ,transfer_start_time = now(),err_info='',
+        engine_count =0,time_granularity=0,transfer_finish_time=null,
+        data_min_time= null,data_max_time= null,transfer_data_count=null
         where batch_code = %s  and transfer_type = %s
         """
         plt.execute(exec_sql, (batch_no, trans_type))
@@ -72,16 +74,16 @@ def update_trans_transfer_progress(batch_no, trans_type, transfer_progress=0, sa
 
 
 # 获取执行的数据
-def get_exec_data(run_count: int = 1) -> dict:
+def get_batch_exec_data(run_count: int = 1) -> dict:
     query_running_sql = "select count(1) as count from data_transfer where trans_sys_status = 0"
-    query_next_exdc_sql = """
+    query_next_exec_sql = """
     SELECT
         t.*,a.field_name,b.batch_name
     FROM
         data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
         inner join wind_field_batch b on t.batch_code = b.batch_code
     WHERE
-        ((t.trans_sys_status = -1 and t.transfer_state = 0) or ( t.trans_sys_status in (1,2) and t.transfer_state = 0))
+         t.trans_sys_status in (-1,1,2) and t.transfer_state = 0
     AND t.transfer_addr != ''
     ORDER BY
         t.update_time
@@ -92,12 +94,30 @@ def get_exec_data(run_count: int = 1) -> dict:
     if now_count >= run_count:
         return None
     else:
-        data = plt.execute(query_next_exdc_sql)
+        data = plt.execute(query_next_exec_sql)
         if type(data) == tuple:
             return {}
         return data[0]
 
 
+def get_data_by_batch_no_and_type(batch_no, transfer_type):
+    query_exec_sql = f"""
+    SELECT
+        t.*,a.field_name,b.batch_name
+    FROM
+        data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
+        inner join wind_field_batch b on t.batch_code = b.batch_code
+    WHERE
+         t.trans_sys_status in (-1,1,2) and t.transfer_state = 2 and t.batch_code = '{batch_no}' and t.transfer_type = '{transfer_type}'
+    AND t.transfer_addr != ''
+    """
+
+    data = plt.execute(query_exec_sql)
+    if type(data) == tuple:
+        return None
+    return data[0]
+
+
 def get_all_wind(field_code):
     query_sql = """
     SELECT t.engine_code,t.engine_name,t.rated_capacity,a.rated_cut_out_windspeed 
@@ -131,9 +151,11 @@ def get_base_wind_and_power(wind_turbine_number):
 
 
 if __name__ == '__main__':
-    print(get_exec_data(run_count=1))
-
-    print("**********************")
-    print(get_exec_data(run_count=2))
-
-# print(update_trans_status_success("test_唐龙-定时任务测试", "second", 10))
+    # print(get_batch_exec_data(run_count=1))
+    #
+    # print("**********************")
+    # print(get_batch_exec_data(run_count=2))
+    # print("**********************")
+    print(get_data_by_batch_no_and_type("test_", "second"))
+    # print(update_trans_status_success("test_唐龙-定时任务测试", "second", 10))
+    begin = datetime.datetime.now()

+ 56 - 84
service/trans_service.py

@@ -12,8 +12,8 @@ from utils.log.trans_log import trans_print
 trans = ConnectMysql("trans")
 
 
-def get_trans_conf(field_code, wind_name, trans_type) -> dict:
-    query_sql = "SELECT * FROM trans_conf where wind_code = %s and type = %s"
+def get_min_sec_conf(field_code, trans_type) -> dict:
+    query_sql = "SELECT * FROM trans_conf where wind_code = %s and type = %s and status = 1"
     res = trans.execute(query_sql, (field_code, trans_type))
     print(res)
     if type(res) == tuple:
@@ -21,14 +21,27 @@ def get_trans_conf(field_code, wind_name, trans_type) -> dict:
     return res[0]
 
 
-def save_to_trans_conf(data_dict=dict()):
-    trans.save_dict(data_dict)
+def get_fault_warn_conf(field_code, trans_type) -> dict:
+    types = list()
+    if trans_type == 'fault':
+        types.append(1)
+    elif trans_type == 'warn':
+        types.append(2)
+    else:
+        trans_print(f"未找到{trans_type}告警/故障的配置")
+        raise ValueError(f"未找到{trans_type}告警/故障的配置")
 
+    types.append(3)
 
-zhishu_list = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]
+    query_sql = "SELECT * FROM warn_fault_conf where wind_code = %s and type in %s and status = 1"
+    res = trans.execute(query_sql, (field_code, types))
+    print(res)
+    if type(res) == tuple:
+        return None
+    return res[0]
 
 
-def creat_table_and_add_partition(table_name, win_names, read_type):
+def creat_min_sec_table(table_name, win_names, read_type):
     create_sql = f"""
     CREATE TABLE
     IF NOT EXISTS `{table_name}` (
@@ -91,7 +104,7 @@ def creat_table_and_add_partition(table_name, win_names, read_type):
     ) ENGINE = myisam DEFAULT CHARSET = utf8mb4
     """
 
-    if read_type == 'second' and len(win_names) > 1:
+    if read_type == 'second' and win_names and len(win_names) > 1:
 
         create_sql = create_sql + f" PARTITION BY LIST COLUMNS(`wind_turbine_number`) ("
         partition_strs = list()
@@ -145,86 +158,45 @@ def batch_statistics(table_name):
         return None
 
 
+def create_warn_fault_table(table_name):
+    sql = f"""
+    CREATE TABLE `{table_name}` (
+      `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号',
+      `wind_turbine_name` varchar(20) DEFAULT NULL COMMENT '原始风机编号',
+      `begin_time` datetime DEFAULT NULL COMMENT '开始时间',
+      `end_time` datetime DEFAULT NULL COMMENT '结束时间',
+      `time_diff` int DEFAULT NULL COMMENT '处理耗时,单位秒',
+      `fault_id` varchar(20) DEFAULT NULL COMMENT '报警或者故障ID',
+      `fault_code` varchar(50) DEFAULT NULL COMMENT '报警或者故障CODE',
+      `fault_detail` varchar(255) DEFAULT NULL COMMENT '错误描述',
+      `fault_level` varchar(20) DEFAULT NULL COMMENT '报警等级',
+      `fault_type` varchar(20) DEFAULT NULL COMMENT '报警类型',
+      `stop_status` varchar(20) DEFAULT NULL COMMENT '刹车状态',
+      KEY `wind_turbine_number` (`wind_turbine_number`),
+      KEY `begin_time` (`begin_time`),
+      KEY `end_time` (`end_time`)
+    ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4
+    """
+
+    trans.execute(sql)
+
+
 if __name__ == '__main__':
     # path_prix = r"/data/download/collection_data/2完成/招远风电场-山东-大唐/清理数据/WOF063100040-WOB00013/second"
     # files = ["WOG00030.csv", "WOG00034.csv"]
     # for path in files:
     #     save_file_to_db("WOF063100040-WOB00013_second", path_prix + os.sep + path, batch_count=100000)
 
-    table_name = "test"
-    read_type = "second"
-    wind_names = ['WOG00030', 'WOG00034']
-
-    create_sql = f"""
-    CREATE TABLE
-    IF NOT EXISTS `{table_name}` (
-        `wind_turbine_number` VARCHAR (20) DEFAULT NULL COMMENT '风机编号',
-        `wind_turbine_name` VARCHAR(20) DEFAULT NULL COMMENT '风机原始名称',
-        `time_stamp` datetime NOT NULL COMMENT '时间戳',
-        `active_power` DOUBLE DEFAULT NULL COMMENT '有功功率',
-        `rotor_speed` DOUBLE DEFAULT NULL COMMENT '风轮转速',
-        `generator_speed` DOUBLE DEFAULT NULL COMMENT '发电机转速',
-        `wind_velocity` DOUBLE DEFAULT NULL COMMENT '风速',
-        `pitch_angle_blade_1` DOUBLE DEFAULT NULL COMMENT '桨距角1',
-        `pitch_angle_blade_2` DOUBLE DEFAULT NULL COMMENT '桨距角2',
-        `pitch_angle_blade_3` DOUBLE DEFAULT NULL COMMENT '桨距角3',
-        `cabin_position` DOUBLE DEFAULT NULL COMMENT '机舱位置',
-        `true_wind_direction` DOUBLE DEFAULT NULL COMMENT '绝对风向',
-        `yaw_error1` DOUBLE DEFAULT NULL COMMENT '对风角度',
-        `set_value_of_active_power` DOUBLE DEFAULT NULL COMMENT '有功功率设定值',
-        `gearbox_oil_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱油温',
-        `generatordrive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机驱动端轴承温度',
-        `generatornon_drive_end_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '发电机非驱动端轴承温度',
-        `cabin_temperature` DOUBLE DEFAULT NULL COMMENT '机舱内温度',
-        `twisted_cable_angle` DOUBLE DEFAULT NULL COMMENT '扭缆角度',
-        `front_back_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱前后振动',
-        `side_to_side_vibration_of_the_cabin` DOUBLE DEFAULT NULL COMMENT '机舱左右振动',
-        `actual_torque` DOUBLE DEFAULT NULL COMMENT '实际力矩',
-        `given_torque` DOUBLE DEFAULT NULL COMMENT '给定力矩',
-        `clockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '顺时针偏航次数',
-        `counterclockwise_yaw_count` DOUBLE DEFAULT NULL COMMENT '逆时针偏航次数',
-        `unusable` DOUBLE DEFAULT NULL COMMENT '不可利用',
-        `power_curve_available` DOUBLE DEFAULT NULL COMMENT '功率曲线可用',
-        `required_gearbox_speed` DOUBLE DEFAULT NULL COMMENT '齿轮箱转速',
-        `inverter_speed_master_control` DOUBLE DEFAULT NULL COMMENT '变频器转速(主控)',
-        `outside_cabin_temperature` DOUBLE DEFAULT NULL COMMENT '环境温度',
-        `main_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '主轴承轴承温度',
-        `gearbox_high_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱高速轴轴承温度',
-        `gearboxmedium_speed_shaftbearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱中速轴轴承温度',
-        `gearbox_low_speed_shaft_bearing_temperature` DOUBLE DEFAULT NULL COMMENT '齿轮箱低速轴轴承温度',
-        `generator_winding1_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组1温度',
-        `generator_winding2_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组2温度',
-        `generator_winding3_temperature` DOUBLE DEFAULT NULL COMMENT '发电机绕组3温度',
-        `wind_turbine_status` DOUBLE DEFAULT NULL COMMENT '风机状态1',
-        `wind_turbine_status2` DOUBLE DEFAULT NULL COMMENT '风机状态2',
-        `turbulence_intensity` DOUBLE DEFAULT NULL COMMENT '湍流强度',
-        `lab` int DEFAULT NULL COMMENT '-1:停机 0:好点  1:欠发功率点;2:超发功率点;3:额定风速以上的超发功率点 4: 限电',
-        `year` INT (4) DEFAULT NULL COMMENT '年',
-        `month` INT (2) DEFAULT NULL COMMENT '月',
-        `day` INT (2) DEFAULT NULL COMMENT '日',
-        `param1` DOUBLE DEFAULT NULL COMMENT '预留1',
-        `param2` DOUBLE DEFAULT NULL COMMENT '预留2',
-        `param3` DOUBLE DEFAULT NULL COMMENT '预留3',
-        `param4` DOUBLE DEFAULT NULL COMMENT '预留4',
-        `param5` DOUBLE DEFAULT NULL COMMENT '预留5',
-        `param6` VARCHAR (20) DEFAULT NULL COMMENT '预留6',
-        `param7` VARCHAR (20) DEFAULT NULL COMMENT '预留7',
-        `param8` VARCHAR (20) DEFAULT NULL COMMENT '预留8',
-        `param9` VARCHAR (20) DEFAULT NULL COMMENT '预留9',
-        `param10` VARCHAR (20) DEFAULT NULL COMMENT '预留10',
-         KEY `time_stamp` (`time_stamp`),
-         KEY `wind_turbine_number` (`wind_turbine_number`)
-    ) ENGINE = myisam DEFAULT CHARSET = utf8mb4
-    """
-
-    if read_type == 'second' and len(wind_names) > 1:
-
-        create_sql = create_sql + f" PARTITION BY LIST COLUMNS(`wind_turbine_number`) ("
-        partition_strs = list()
-        for wind_name in wind_names:
-            partition_strs.append(f" PARTITION p{wind_name} VALUES IN('{wind_name}')")
-
-        create_sql = create_sql + ",".join(partition_strs) + ")"
-
-
-    print(create_sql)
+    # sql = """
+    # SELECT wind_turbine_number, time_stamp, wind_velocity, active_power
+    #                            FROM `WOF085500002-WOB000001_second`
+    #                            WHERE  time_stamp >= '2024-02-17 00:00:00' AND time_stamp <= '2024-05-14 00:00:00' AND lab = 0
+    # """
+    #
+    # begin = datetime.datetime.now()
+    # df = trans.read_sql_to_df(sql)
+    # end = datetime.datetime.now()
+    # print(df.shape)
+    # print(df.info())
+    # print("Time used:", (end - begin).seconds)
+    get_fault_warn_conf("test", "fault")

+ 0 - 157
test_app_run.py

@@ -1,157 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time    : 2024/6/11
-# @Author  : 魏志亮
-import os
-import sys
-import traceback
-
-
-def run_schedule(step=0, end=4, run_count=1):
-    # 更新超时任务
-    update_timeout_trans_data()
-
-    data = get_exec_data(run_count)
-    if data is None:
-        trans_print("当前有任务在执行")
-    elif len(data.keys()) == 0:
-        trans_print("当前无任务")
-    else:
-        batch_no = data['batch_code']
-        batch_name = data['batch_name']
-        transfer_type = data['transfer_type']
-        transfer_file_addr = data['transfer_addr']
-        field_code = data['field_code']
-        field_name = data['field_name']
-
-        __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr, field_name, field_code,
-                     save_db=True)
-
-
-def run_local(step=0, end=3, batch_no=None, batch_name='', transfer_type=None, transfer_file_addr=None, field_name=None,
-              field_code="测试", save_db=False):
-    if batch_no is None or str(batch_no).strip() == '':
-        return "批次编号不能为空"
-
-    if transfer_type not in ['second', 'minute', 'second_1']:
-        return "查询类型错误"
-
-    if transfer_file_addr is None or str(transfer_file_addr).strip() == '':
-        return "文件路径不能为空"
-
-    __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr, field_name, field_code,
-                 save_db=save_db)
-
-
-def __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_addr=None, field_name=None,
-                 field_code="测试",
-                 save_db=False):
-    trance_id = '-'.join([batch_no, field_name, transfer_type])
-    set_trance_id(trance_id)
-    conf_map = get_trans_conf(field_code, field_name, transfer_type)
-    if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
-        message = f"未找到{field_name}的{transfer_type}配置"
-        trans_print(message)
-        update_trans_status_error(batch_no, transfer_type, message, save_db)
-    else:
-
-        resolve_col_prefix = read_conf(conf_map, 'resolve_col_prefix')
-        wind_name_exec = read_conf(conf_map, 'wind_name_exec', None)
-        is_vertical_table = read_conf(conf_map, 'is_vertical_table', False)
-        merge_columns = read_conf(conf_map, 'merge_columns', False)
-
-        vertical_cols = read_conf(conf_map, 'vertical_read_cols', '').split(',')
-        index_cols = read_conf(conf_map, 'vertical_index_cols', '').split(',')
-        vertical_key = read_conf(conf_map, 'vertical_col_key')
-        vertical_value = read_conf(conf_map, 'vertical_col_value')
-        need_valid_cols = not merge_columns
-
-        begin_header = read_conf(conf_map, 'begin_header', 0)
-
-        cols_trans_all = dict()
-        trans_cols = ['wind_turbine_number', 'time_stamp', 'active_power', 'rotor_speed', 'generator_speed',
-                      'wind_velocity', 'pitch_angle_blade_1', 'pitch_angle_blade_2', 'pitch_angle_blade_3',
-                      'cabin_position', 'true_wind_direction', 'yaw_error1', 'set_value_of_active_power',
-                      'gearbox_oil_temperature', 'generatordrive_end_bearing_temperature',
-                      'generatornon_drive_end_bearing_temperature', 'wind_turbine_status',
-                      'wind_turbine_status2',
-                      'cabin_temperature', 'twisted_cable_angle', 'front_back_vibration_of_the_cabin',
-                      'side_to_side_vibration_of_the_cabin', 'actual_torque', 'given_torque',
-                      'clockwise_yaw_count',
-                      'counterclockwise_yaw_count', 'unusable', 'power_curve_available',
-                      'required_gearbox_speed',
-                      'inverter_speed_master_control', 'outside_cabin_temperature', 'main_bearing_temperature',
-                      'gearbox_high_speed_shaft_bearing_temperature',
-                      'gearboxmedium_speed_shaftbearing_temperature',
-                      'gearbox_low_speed_shaft_bearing_temperature', 'generator_winding1_temperature',
-                      'generator_winding2_temperature', 'generator_winding3_temperature',
-                      'turbulence_intensity', 'param1',
-                      'param2', 'param3', 'param4', 'param5', 'param6', 'param7', 'param8', 'param9', 'param10']
-
-        for col in trans_cols:
-            cols_trans_all[col] = read_conf(conf_map, col, '')
-
-        params = TransParam(read_type=transfer_type, read_path=transfer_file_addr,
-                            cols_tran=cols_trans_all,
-                            wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
-                            vertical_cols=vertical_cols, vertical_key=vertical_key,
-                            vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
-                            resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols,
-                            header=begin_header)
-
-        try:
-            trans_subject = WindFarms(batch_no=batch_no, batch_name=batch_name, field_code=field_code,
-                                      field_name=field_name,
-                                      save_db=save_db,
-                                      header=begin_header, trans_param=params)
-            trans_subject.run(step=step, end=end)
-        except Exception as e:
-            trans_print(traceback.format_exc())
-            message = "系统返回错误:" + str(e)
-            update_trans_status_error(batch_no, transfer_type, message, save_db)
-        finally:
-            set_trance_id("")
-            # trans_subject.pathsAndTable.delete_tmp_files()
-
-
-if __name__ == '__main__':
-    env = None
-    if len(sys.argv) >= 2:
-        env = sys.argv[1]
-    else:
-        env = 'prod'
-    print(sys.argv)
-    if env is None:
-        raise Exception("请配置运行环境")
-
-    os.environ['env'] = env
-
-    run_count = 1
-    if len(sys.argv) >= 3:
-        run_count = int(sys.argv[2])
-
-    conf_path = '/data/config/etl_config.yaml'
-    if len(sys.argv) >= 4:
-        conf_path = sys.argv[3]
-
-    os.environ['ETL_CONF'] = conf_path
-
-    from utils.log.trans_log import trans_print, set_trance_id
-    from etl.base.TransParam import TransParam
-    from etl.base.WindFarms import WindFarms
-    from service.plt_service import get_exec_data, update_trans_status_error, update_timeout_trans_data
-    from service.trans_service import get_trans_conf
-    from utils.conf.read_conf import read_conf
-
-    # run_schedule(run_count=run_count)
-
-    # run_local(0, 3, batch_no='test_11', batch_name='test', transfer_type='minute',
-    #           transfer_file_addr=r'D:\trans_data\密马风电场\收资数据\minute', field_name='密马风电场',
-    #           field_code="WOF035200003", save_db=False)
-
-    #run_local(4, 4, batch_no='WOF053600062-WOB0000111-test', batch_name='ZYFDC000013-test', transfer_type='second',
-    #          transfer_file_addr=r'/data/download/collection_data/2完成/招远风电场-山东-大唐/收资数据/招远秒级数据1111', field_name='招远风电场',
-    #          field_code="WOF053600062", save_db=False)
-
-    run_local(0, 4, batch_no='WOF079700019-WOB000004', batch_name='JHS标签min-after', transfer_type='minute',
-               transfer_file_addr=r'/data/download/collection_data/2完成/金华山风电场-江西-大唐/收资数据/调整后/min', field_name='金华山风电场',
-               field_code="WOF079700019", save_db=True)

+ 93 - 0
test_run_local.py

@@ -0,0 +1,93 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/11
+# @Author  : 魏志亮
+import datetime
+import os
+import sys
+import traceback
+
+import pandas as pd
+
+from utils.conf.read_conf import yaml_conf, read_conf
+
+
+def get_exec_data(batch_no=None, read_type=None, run_count=1):
+    if batch_no and read_type:
+        data = get_data_by_batch_no_and_type(batch_no, read_type)
+        if data is None:
+            raise ValueError(f"未找到批次号:{batch_no},类型:{read_type}")
+
+    else:
+        data = get_batch_exec_data(run_count)
+        if data is None:
+            trans_print("当前有任务在执行")
+            sys.exit(0)
+        elif len(data.keys()) == 0:
+            trans_print("当前无任务")
+            sys.exit(0)
+
+    return data
+
+
+def run(data: dict = dict(), save_db=False):
+    exec_process = None
+    if data['transfer_type'] in ['second', 'minute']:
+        exec_process = MinSecTrans(data=data, save_db=save_db)
+
+    if data['transfer_type'] in ['fault', 'warn']:
+        exec_process = FaultWarnTrans(data=data, save_db=save_db)
+
+    if exec_process is None:
+        raise Exception("No exec process")
+    exec_process.run()
+
+
+if __name__ == '__main__':
+    env = 'dev'
+    if len(sys.argv) >= 2:
+        env = sys.argv[1]
+
+    conf_path = os.path.abspath(f"./conf/etl_config_{env}.yaml")
+    os.environ['ETL_CONF'] = conf_path
+    yaml_config = yaml_conf(conf_path)
+    os.environ['env'] = env
+    run_count = int(read_conf(yaml_config, "run_batch_count", 1))
+
+
+    from utils.log.trans_log import trans_print
+    from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type
+    from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
+    from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
+    from utils.file.trans_methods import read_file_to_df
+
+    df = read_file_to_df("tmp_file/rebuild_data.csv")
+    results = list()
+    data = dict()
+    for batch_code, batch_name, transfer_type, transfer_addr, field_code, field_name \
+            in zip(df['batch_code'], df['batch_name'], df['transfer_type'], df['transfer_addr'], df['field_code'],
+                   df['field_name']):
+        batch_begin = datetime.datetime.now()
+        transfer_addr = transfer_addr.replace(r"/data/download/collection_data",
+                                              r"/data/download/datang_shangxian")
+        trans_print("开始执行批次:", batch_code, batch_name, transfer_type, field_code, field_name)
+        trans_print("批次路径:", transfer_addr)
+
+        data['batch_code'] = batch_code
+        data['batch_name'] = batch_name
+        data['transfer_type'] = transfer_type
+        data['transfer_addr'] = transfer_addr
+        data['field_code'] = field_code
+        data['field_name'] = field_name
+        try:
+            run(data=data, save_db=True)
+            results.append((batch_code, batch_name, transfer_type, field_code, field_name, 'success'))
+        except Exception as e:
+            results.append((batch_code, batch_name, transfer_type, field_code, field_name, 'error'))
+            trans_print(traceback.format_exc())
+        finally:
+            trans_print("执行结束,耗时:", datetime.datetime.now() - batch_begin, "总耗时:", datetime.datetime.now() - begin)
+
+    for data in results:
+        trans_print(data)
+
+    trans_print("执行结束,总耗时:", datetime.datetime.now() - begin)

+ 197 - 0
tmp_file/baiyushan_20240906.py

@@ -0,0 +1,197 @@
+import datetime
+import os
+from multiprocessing import Pool
+
+import chardet
+import pandas as pd
+
+
+# 获取文件编码
+def detect_file_encoding(filename):
+    # 读取文件的前1000个字节(足够用于大多数编码检测)
+    with open(filename, 'rb') as f:
+        rawdata = f.read(1000)
+    result = chardet.detect(rawdata)
+    encoding = result['encoding']
+
+    if encoding is None:
+        encoding = 'gb18030'
+
+    if encoding and encoding.lower() == 'gb2312' or encoding.lower().startswith("windows"):
+        encoding = 'gb18030'
+    return encoding
+
+
+# 读取数据到df
+def read_file_to_df(file_path, read_cols=list(), header=0):
+    df = pd.DataFrame()
+    if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
+        encoding = detect_file_encoding(file_path)
+        end_with_gz = str(file_path).lower().endswith("gz")
+        if read_cols:
+            if end_with_gz:
+                df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header)
+            else:
+                df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header, on_bad_lines='warn')
+        else:
+
+            if end_with_gz:
+                df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header)
+            else:
+                df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn')
+
+    else:
+        xls = pd.ExcelFile(file_path)
+        # 获取所有的sheet名称
+        sheet_names = xls.sheet_names
+        for sheet in sheet_names:
+            if read_cols:
+                df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header, usecols=read_cols)])
+            else:
+                df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header)])
+
+    return df
+
+
+def __build_directory_dict(directory_dict, path, filter_types=None):
+    # 遍历目录下的所有项
+    for item in os.listdir(path):
+        item_path = os.path.join(path, item)
+        if os.path.isdir(item_path):
+            __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
+        elif os.path.isfile(item_path):
+            if path not in directory_dict:
+                directory_dict[path] = []
+
+            if filter_types is None or len(filter_types) == 0:
+                directory_dict[path].append(item_path)
+            elif str(item_path).split(".")[-1] in filter_types:
+                if str(item_path).count("~$") == 0:
+                    directory_dict[path].append(item_path)
+
+    # 读取所有文件
+
+
+# 读取路径下所有的excel文件
+def read_excel_files(read_path):
+    directory_dict = {}
+    __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
+
+    return [path for paths in directory_dict.values() for path in paths if path]
+
+
+# 创建路径
+def create_file_path(path, is_file_path=False):
+    if is_file_path:
+        path = os.path.dirname(path)
+
+    if not os.path.exists(path):
+        os.makedirs(path, exist_ok=True)
+
+
+def read_status(status_path):
+    all_files = read_excel_files(status_path)
+
+    with Pool(20) as pool:
+        dfs = pool.starmap(read_file_to_df, [(file, ['设备名称', '状态码', '开始时间'], 2) for file in all_files])
+
+    df = pd.concat(dfs)
+    df = df[df['状态码'].isin([3, 5])]
+    df['开始时间'] = pd.to_datetime(df['开始时间'])
+
+    df['处理后时间'] = (df['开始时间'] + pd.Timedelta(minutes=10)).apply(
+        lambda x: f"{x.year}-{str(x.month).zfill(2)}-{str(x.day).zfill(2)} {str(x.hour).zfill(2)}:{x.minute // 10}0:00")
+
+    df['处理后时间'] = pd.to_datetime(df['处理后时间'])
+    df = df[(df['处理后时间'] >= '2023-09-01 00:00:00')]
+    df[df['处理后时间'] >= '2024-09-01 00:00:00'] = '2024-09-01 00:00:00'
+    df.sort_values(by=['设备名称', '处理后时间'], inplace=True)
+
+    return df
+
+
+def read_fault_data(fault_path):
+    all_files = read_excel_files(fault_path)
+
+    with Pool(20) as pool:
+        dfs = pool.starmap(read_file_to_df, [(file, ['设备名称', '故障开始时间'], 2) for file in all_files])
+
+    df = pd.concat(dfs)
+    df = df[df['设备名称'].str.startswith("#")]
+    df['故障开始时间'] = pd.to_datetime(df['故障开始时间'])
+
+    df['处理后故障开始时间'] = (df['故障开始时间'] + pd.Timedelta(minutes=10)).apply(
+        lambda x: f"{x.year}-{str(x.month).zfill(2)}-{str(x.day).zfill(2)} {str(x.hour).zfill(2)}:{x.minute // 10}0:00")
+
+    df['处理后故障开始时间'] = pd.to_datetime(df['处理后故障开始时间'])
+    df = df[(df['处理后故障开始时间'] >= '2023-09-01 00:00:00') & (df['处理后故障开始时间'] < '2024-09-01 00:00:00')]
+    df.sort_values(by=['设备名称', '处理后故障开始时间'], inplace=True)
+
+    return df
+
+
+def read_10min_data(data_path):
+    all_files = read_excel_files(data_path)
+
+    with Pool(20) as pool:
+        dfs = pool.starmap(read_file_to_df,
+                           [(file, ['设备名称', '时间', '平均风速(m/s)', '平均网侧有功功率(kW)'], 1) for file in all_files])
+
+    df = pd.concat(dfs)
+    df['时间'] = pd.to_datetime(df['时间'])
+
+    df = df[(df['时间'] >= '2023-09-01 00:00:00') & (df['时间'] < '2024-09-01 00:00:00')]
+    df.sort_values(by=['设备名称', '时间'], inplace=True)
+    return df
+
+
+def select_data_and_save(name, fault_df, origin_df):
+    df = pd.DataFrame()
+    for i in range(fault_df.shape[0]):
+        fault = fault_df.iloc[i]
+        con1 = origin_df['时间'] >= fault['处理后故障开始时间']
+        con2 = origin_df['时间'] <= fault['结束时间']
+        df = pd.concat([df, origin_df[con1 & con2]])
+
+    name = name.replace('#', 'F')
+    df.drop_duplicates(inplace=True)
+    df.to_csv(save_path + os.sep + name + '.csv', index=False, encoding='utf8')
+
+
+if __name__ == '__main__':
+    base_path = r'/data/download/白玉山/需要整理的数据'
+    save_path = base_path + os.sep + 'sele_data_202409261135'
+    create_file_path(save_path)
+    status_df = read_status(base_path + os.sep + '设备状态')
+    fault_df = read_fault_data(base_path + os.sep + '故障')
+    data_df = read_10min_data(base_path + os.sep + '十分钟')
+
+    status_df.to_csv(base_path + os.sep + '设备状态' + '.csv', index=False, encoding='utf8')
+    fault_df.to_csv(base_path + os.sep + '故障' + '.csv', index=False, encoding='utf8')
+    data_df.to_csv(base_path + os.sep + '十分钟' + '.csv', index=False, encoding='utf8')
+
+    print(status_df.shape)
+    print(fault_df.shape)
+    print(data_df.shape)
+
+    fault_list = list()
+    for i in range(fault_df.shape[0]):
+        data = fault_df.iloc[i]
+        con1 = status_df['设备名称'] == data['设备名称']
+        con2 = status_df['处理后时间'] >= data['处理后故障开始时间']
+        fault_list.append(status_df[con1 & con2]['处理后时间'].min())
+    fault_df['结束时间'] = fault_list
+
+    status_df.to_csv(base_path + os.sep + '设备状态' + '.csv', index=False, encoding='utf8')
+    fault_df.to_csv(base_path + os.sep + '故障' + '.csv', index=False, encoding='utf8')
+    data_df.to_csv(base_path + os.sep + '十分钟' + '.csv', index=False, encoding='utf8')
+
+    names = set(fault_df['设备名称'])
+    fault_map = dict()
+    data_map = dict()
+    for name in names:
+        fault_map[name] = fault_df[fault_df['设备名称'] == name]
+        data_map[name] = data_df[data_df['设备名称'] == name]
+
+    with Pool(20) as pool:
+        pool.starmap(select_data_and_save, [(name, fault_map[name], data_map[name]) for name in names])

+ 94 - 0
tmp_file/cp_online_data_to_other.py

@@ -0,0 +1,94 @@
+import datetime
+import multiprocessing
+import os
+import shutil
+
+not_move_dir = ["乌梅山风电场-江西-大唐",
+                "诺木洪风电场-甘肃-华电",
+                "平陆风电场-山西-中广核",
+                "泗洪协合风电场-安徽-深能南控",
+                "诺木洪风电场-青海-华电",
+                "长清风电场-山东-国电"
+                ]
+
+read_dir = r"/data/download/collection_data"
+# read_dir = r'Z:\collection_data'
+save_base_dir = r"/data/download/datang_shangxian"
+
+
+def __build_directory_dict(directory_dict, path, filter_types=None):
+    # 遍历目录下的所有项
+    for item in os.listdir(path):
+        if item not in not_move_dir:
+            item_path = os.path.join(path, item)
+            if os.path.isdir(item_path):
+                __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
+            elif os.path.isfile(item_path):
+                if path not in directory_dict:
+                    directory_dict[path] = []
+
+                if filter_types is None or len(filter_types) == 0:
+                    directory_dict[path].append(item_path)
+                elif str(item_path).split(".")[-1] in filter_types:
+                    if str(item_path).count("~$") == 0:
+                        directory_dict[path].append(item_path)
+
+
+# 读取路径下所有的excel文件
+def read_excel_files(read_path):
+    if os.path.isfile(read_path):
+        return [read_path]
+
+    directory_dict = {}
+    __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
+
+    return [path for paths in directory_dict.values() for path in paths if path]
+
+
+# 读取路径下所有的文件
+def read_files(read_path):
+    if os.path.isfile(read_path):
+        return [read_path]
+    directory_dict = {}
+    __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz', 'zip', 'rar'])
+
+    return [path for paths in directory_dict.values() for path in paths if path]
+
+
+# 创建路径
+def create_file_path(path, is_file_path=False):
+    """
+    创建路径
+    :param path:创建文件夹的路径
+    :param is_file_path: 传入的path是否包含具体的文件名
+    """
+    if is_file_path:
+        path = os.path.dirname(path)
+
+    if not os.path.exists(path):
+        os.makedirs(path, exist_ok=True)
+
+
+def copy_to_new(from_path):
+    to_path = from_path.replace(read_dir, save_base_dir)
+    is_file = False
+    if to_path.count('.') > 0:
+        is_file = True
+
+    create_file_path(to_path, is_file_path=is_file)
+
+    shutil.copy(from_path, to_path)
+
+
+print("开始:", datetime.datetime.now())
+begin = datetime.datetime.now()
+read_all_files = [i for i in read_files(read_dir) if i.find("收资数据") > -1]
+print(len(read_all_files))
+print("统计耗时:", datetime.datetime.now() - begin)
+cp_begin = datetime.datetime.now()
+
+with multiprocessing.Pool(40) as pool:
+    pool.starmap(copy_to_new, [(path,) for path in read_all_files])
+
+print(len(read_all_files), "耗时:", datetime.datetime.now() - cp_begin, "总耗时:", datetime.datetime.now() - begin)
+print("结束:", datetime.datetime.now())

+ 40 - 0
tmp_file/error_ms_data.py

@@ -0,0 +1,40 @@
+from datetime import datetime
+
+import pandas as pd
+
+
+def convert_date(date_str):
+    cut_index = str(date_str).rfind("_")
+    date = date_str[0:cut_index].replace("_", "-")
+    time = date_str[cut_index + 1:].replace(":", ".")
+
+    return datetime.strptime(f"{date} {time}", '%Y-%m-%d %H.%M.%S.%f')
+
+
+df = pd.read_csv(r"d:/data/b2_240828_2324_Err 1.csv", header=1)
+df.dropna(subset='TimeStamp', inplace=True)
+df.drop_duplicates(subset='TimeStamp', keep="first", inplace=True)
+
+origin_columns = list(df.columns)
+
+df['TimeStamp1'] = df['TimeStamp'].apply(convert_date)
+df.sort_values(by='TimeStamp1', inplace=True)
+
+# df['DateTime'] = pd.to_datetime(df['TimeStamp'], format="%Y-%m-%d %H:%M:%S")
+df['DateTime'] = df['TimeStamp1'].apply(lambda x: x.strftime("%Y-%m-%d %H:%M:%S"))
+
+print(df.shape)
+
+dateTime_count = df['DateTime'].value_counts()
+
+dateTime_count_1 = dateTime_count[dateTime_count == 1]
+dateTime_count_gt1 = dateTime_count[dateTime_count > 1]
+
+df1 = df[df['DateTime'].isin(dateTime_count_1.index.values)]
+df2 = df[df['DateTime'].isin(dateTime_count_gt1.index.values)]
+
+print(df1.shape)
+print(df2.shape)
+origin_columns.insert(0, 'DateTime')
+df1.to_csv("1秒数据.csv", encoding='utf-8', index=False, columns=origin_columns, date_format="%Y-%m-%d %H:%M:%S.%f")
+df2.to_csv("毫秒数据.csv", encoding='utf-8', index=False, columns=origin_columns, date_format="%Y-%m-%d %H:%M:%S.%f")

+ 48 - 0
tmp_file/filter_lose_data.py

@@ -0,0 +1,48 @@
+import datetime
+
+import pandas as pd
+
+df = pd.read_csv("D:\data\白玉山后评估数据资料\十分钟.csv", encoding='utf8')
+
+df['时间'] = pd.to_datetime(df['时间'])
+df['plus_10min'] = df['时间'] + pd.Timedelta(minutes=10)
+
+names = set(df['设备名称'])
+
+
+def get_time_space_count(start_time: datetime.datetime, end_time: datetime.datetime, time_space=1):
+    """
+    获取俩个时间之间的个数
+    :return: 查询时间间隔
+    """
+    delta = end_time - start_time
+    total_seconds = delta.days * 24 * 60 * 60 + delta.seconds
+
+    return abs(int(total_seconds / time_space))
+
+
+result_dict = dict()
+for name in names:
+    q_df = df[df['设备名称'] == name]
+    q_df['unshift'] = q_df['时间'].shift(-1)
+    q_df.fillna('2024-09-01 00:00:00', inplace=True)
+    result_df = q_df[~(q_df['plus_10min'] == q_df['unshift'])]
+    result_df.reset_index(inplace=True)
+    q_list = list()
+    count = 0
+    result_df.to_csv('test.csv', encoding='utf8')
+    for i in range(result_df.shape[0]):
+        data = result_df.iloc[i]
+        begin = data['时间']
+        end = data['unshift']
+        count = count + get_time_space_count(begin, end, 600) - 1
+        # if end is not None and end != np.nan:
+        #     q_list.append(f"{begin} ~ {end}")
+
+    result_dict[name] = count
+
+with open("缺失_数量.csv", 'w', encoding='utf8') as f:
+    for k, v in result_dict.items():
+        # v.insert(0, k)
+        # f.write(",".join(v) + "\n")
+        f.write(f"{k},{v}\n")

+ 27 - 0
tmp_file/hebing_matlib_result.py

@@ -0,0 +1,27 @@
+import os
+import pandas as pd
+
+read_path = r"D:\data\电量损失及散点图"
+df = pd.DataFrame()
+
+cols = ['风机', '应发电量', '实发电量', '停机损失电量', '坏点+限电损失电量', '性能损失电量', '坏点损失电量', '限电损失电量', '超发电量', '应发电量百分比', '实发电量百分比',
+        '停机损失电量百分比', '坏点+限电损失电量百分比', '性能损失电量百分比', '坏点损失电量百分比', '限电损失电量百分比', '超发电量百分比', '平均风速', '可利用率']
+
+for root, dir, files in os.walk(read_path):
+    if files:
+        base_name = os.path.basename(root)
+        wind_df = pd.DataFrame()
+        print(root)
+        df1 = pd.read_excel(os.path.join(root, "EPPer.xls"), usecols=['应发电量百分比', '实发电量百分比',
+                                                                     '停机损失电量百分比', '坏点+限电损失电量百分比', '性能损失电量百分比',
+                                                                     '坏点损失电量百分比',
+                                                                     '限电损失电量百分比', '超发电量百分比', '平均风速', '可利用率'])
+        df2 = pd.read_excel(os.path.join(root, "EPKW.xls"),
+                            usecols=['应发电量', '实发电量', '停机损失电量', '坏点+限电损失电量', '性能损失电量', '坏点损失电量', '限电损失电量', '超发电量'])
+        wind_df = pd.concat([df1, df2], axis=1)
+        wind_df['风机'] = base_name
+        wind_df.reset_index(inplace=True)
+        print(wind_df.columns)
+        df = pd.concat([df, wind_df], ignore_index=True)
+
+df.to_csv("合并结果.csv", index=False, encoding='utf8', columns=cols)

+ 38 - 0
tmp_file/queshi_bili.py

@@ -0,0 +1,38 @@
+import datetime
+
+import pandas as pd
+
+
+def get_time_space_count(start_time: datetime.datetime, end_time: datetime.datetime, time_space=1):
+    """
+    获取俩个时间之间的个数
+    :return: 查询时间间隔
+    """
+    delta = end_time - start_time
+    total_seconds = delta.days * 24 * 60 * 60 + delta.seconds
+
+    return abs(int(total_seconds / time_space))
+
+
+df = pd.read_csv("D:\data\白玉山后评估数据资料\十分钟.csv", encoding='utf8')
+
+df['时间'] = pd.to_datetime(df['时间'])
+df['plus_10min'] = df['时间'] + pd.Timedelta(minutes=10)
+
+names = list(set(df['设备名称']))
+names.sort()
+
+count = get_time_space_count(datetime.datetime.strptime('2023-09-01 00:00:00', '%Y-%m-%d %H:%M:%S'),
+                             datetime.datetime.strptime('2024-09-01 00:00:00', '%Y-%m-%d %H:%M:%S'), 600)
+
+result_df = pd.DataFrame(df['设备名称'].value_counts())
+result_df.reset_index(inplace=True)
+result_df.columns = ['风机', '数量']
+
+result_df['总数'] = count
+
+result_df['完整度'] = result_df['数量'].apply(lambda x: round(x * 100 / count, 2))
+
+result_df.sort_values(by=['风机'], inplace=True)
+
+print(result_df)

+ 1 - 1
tmp_file/read_and_draw_png.py

@@ -1,7 +1,7 @@
 import multiprocessing
 import os
 
-from etl.step.ClassIdentifier import ClassIdentifier
+from etl.wind_power.min_sec.ClassIdentifier import ClassIdentifier
 from utils.draw.draw_file import scatter
 from utils.file.trans_methods import read_file_to_df
 

+ 55 - 0
tmp_file/zibo_guzhang_select_time.py

@@ -0,0 +1,55 @@
+from datetime import datetime, timedelta
+
+from utils.file.trans_methods import *
+
+
+def convert_and_calculate_time_range(time_str):
+    # 解析原始字符串
+    date_part = time_str[:6]
+    time_part = time_str[7:]
+
+    # 将短日期格式转换为完整年份
+    year = '20' + date_part[:2]
+    month = date_part[2:4]
+    day = date_part[4:]
+
+    hour = time_part[:2]
+    minute = time_part[2:]
+
+    # 创建 datetime 对象
+    base_time = datetime.datetime.strptime(f"{year}-{month}-{day} {hour}:{minute}", "%Y-%m-%d %H:%M")
+
+    # 计算时间区间
+    start_time = base_time.replace(second=0, microsecond=0) - timedelta(minutes=2)
+    end_time = base_time.replace(second=0, microsecond=0) + timedelta(minutes=3)
+
+    return base_time.strftime("%Y-%m-%d %H:%M"), start_time.strftime("%Y-%m-%d %H:%M:%S"), end_time.strftime(
+        "%Y-%m-%d %H:%M:%S")
+
+
+all_df = read_file_to_df(r"D:\data\淄博\故障记录_filtered.csv")
+all_df['激活时间'] = pd.to_datetime(all_df['激活时间'])
+
+all_files = read_excel_files(r"D:\data\淄博\淄博风场buffer文件(1)")
+
+dfs = pd.DataFrame()
+
+for file in all_files:
+    base_name = os.path.basename(file)
+    if base_name.startswith("b"):
+        try:
+            turbnine_no = int(base_name.split("_")[0].replace("b", ""))
+            base_time, start_time, end_time = convert_and_calculate_time_range(
+                base_name.replace(base_name.split("_")[0] + "_", "")[0:11])
+        except Exception as e:
+            print("error:", file)
+            raise e
+
+        condation1 = (all_df['激活时间'] >= start_time) & (all_df['风机名'] == turbnine_no)
+        condation2 = (all_df['激活时间'] < end_time) & (all_df['风机名'] == turbnine_no)
+        condation = condation1 & condation2
+        dfs = pd.concat([dfs, all_df[condation]])
+
+dfs.drop_duplicates(inplace=True)
+
+dfs.to_csv(r"D:\data\淄博\result.csv", encoding='utf8', index=False)

+ 87 - 0
tmp_file/对比文件夹列名差值.py

@@ -0,0 +1,87 @@
+import multiprocessing
+
+from utils.file.trans_methods import *
+
+
+def boolean_is_check_data(df_vas):
+    fault_list = ['Checked', 'Indeterminate', 'Unchecked']
+    for fault in fault_list:
+        if fault in df_vas:
+            return True
+
+    return False
+
+
+def compareTwoFolders(df1s, df2s):
+    for is_falut in [False, True]:
+        list1 = list()
+        for df in df1s:
+            tmp_list = [str(i).split('_')[-1] for i in list(df.columns) if i != 'sheet_name']
+            if is_falut:
+                if boolean_is_check_data(df.values):
+                    list1.extend(tmp_list)
+            else:
+                if not boolean_is_check_data(df.values):
+                    list1.extend(tmp_list)
+
+        list2 = list()
+        for df in df2s:
+            tmp_list = [str(i).split('_')[-1] for i in list(df.columns) if i != 'sheet_name']
+            if is_falut:
+                if boolean_is_check_data(df.values):
+                    list2.extend(tmp_list)
+            else:
+                if not boolean_is_check_data(df.values):
+                    list2.extend(tmp_list)
+
+        set1 = set(list1)
+        set2 = set(list2)
+
+        list1 = list(set1)
+        list2 = list(set2)
+        list1.sort()
+        list2.sort()
+
+        print(list1)
+        print(list2)
+
+        list3 = list(set1 - set2)
+        list3.sort()
+
+        list4 = list(set2 - set1)
+        list4.sort()
+        print(list3)
+        print(list4)
+
+        max_count = max(len(list1), len(list2), len(list3), len(list4))
+        list1.extend([''] * (max_count - len(list1)))
+        list2.extend([''] * (max_count - len(list2)))
+        list3.extend([''] * (max_count - len(list3)))
+        list4.extend([''] * (max_count - len(list4)))
+
+        file_name = 'col_compare.csv' if not is_falut else 'col_compare_falut.csv'
+        with open(file_name, 'w', encoding='utf8') as f:
+            f.write(",".join(["对方提供", "自己获取", "对方提供多的字段", "自己提供多的字段"]))
+            f.write('\n')
+            for a, b, c, d in zip(list1, list2, list3, list4):
+                f.write(",".join([a, b, c, d]))
+                f.write('\n')
+
+            f.flush()
+
+
+if __name__ == '__main__':
+    begin = datetime.datetime.now()
+    dir1 = r'D:\data\新华水电\风机SCADA数据\9月风机数据_对方复制'
+    dir2 = r'D:\data\新华水电\风机SCADA数据\自己复制'
+    files1 = read_excel_files(dir1)
+    files2 = read_excel_files(dir2)
+    with multiprocessing.Pool(10) as pool:
+        df1s = pool.starmap(read_file_to_df, [(file, list(), None, 1) for file in files1])
+
+    with multiprocessing.Pool(10) as pool:
+        df2s = pool.starmap(read_file_to_df, [(file, list(), None, 1) for file in files2])
+
+    compareTwoFolders(df1s, df2s)
+
+    print(datetime.datetime.now() - begin)

+ 35 - 0
tmp_file/白玉山每月限电损失.py

@@ -0,0 +1,35 @@
+import os
+
+import pandas as pd
+
+read_path = r'D:\data\白玉山后评估数据资料\需要整理的数据\每月发电量和限电量、限电率'
+
+all_paths = list()
+for root, dirs, files in os.walk(read_path):
+    if files:
+        for file in files:
+            year_mont = int(file.split("(")[1].split("_")[0])
+            if year_mont >= 20230901 and year_mont < 20240901:
+                all_paths.append(os.path.join(root, file))
+
+df = pd.DataFrame()
+
+for path in all_paths:
+    now_df = pd.read_excel(path, usecols=['设备名称', '统计时间', '限电损失电量(kWh)'], header=2)
+    now_df = now_df[now_df['设备名称'].str.startswith("#")]
+    df = pd.concat([df, now_df])
+
+## 人工验证 看一看
+print(df[df['设备名称'] == '#34'])
+
+df = df[['设备名称', '限电损失电量(kWh)']]
+group_df = df.groupby('设备名称').sum()
+
+result_df = pd.DataFrame(group_df)
+result_df.reset_index(inplace=True)
+result_df.columns = ['设备名称', '总限电损失电量(kWh)']
+result_df.sort_values(by=['设备名称'], inplace=True)
+
+print(result_df)
+
+result_df.to_csv("设备总限电损失.csv", encoding='utf-8', index=False)

+ 14 - 11
utils/db/ConnectMysql.py

@@ -1,6 +1,7 @@
 import os
 import traceback
 
+import pandas as pd
 import pymysql
 from pymysql.cursors import DictCursor
 from sqlalchemy import create_engine
@@ -12,17 +13,13 @@ from utils.log.trans_log import trans_print
 class ConnectMysql:
 
     def __init__(self, connet_name):
-        #self.yaml_data = yaml_conf("/data/config/etl_config.yaml")
-        self.yaml_data = yaml_conf(os.environ.get('ETL_CONF', "/data/config/etl_config.yaml"))
+        self.yaml_data = yaml_conf(os.environ.get('ETL_CONF'))
         self.connet_name = connet_name
-        if 'env' in os.environ:
-            self.env = os.environ['env']
-        else:
-            self.env = 'dev'
+        self.config = self.yaml_data[self.connet_name]
 
     # 从连接池中获取一个连接
     def get_conn(self):
-        return pymysql.connect(**self.yaml_data[self.connet_name + "_" + self.env])
+        return pymysql.connect(**self.config)
 
     # 使用连接执行sql
     def execute(self, sql, params=tuple()):
@@ -41,12 +38,18 @@ class ConnectMysql:
                     conn.rollback()
                     raise e
 
-    def execute_df_save(self, df, table_name):
-        config = self.yaml_data[self.connet_name + "_" + self.env]
+    def get_engine(self):
+        config = self.config
         username = config['user']
         password = config['password']
         host = config['host']
         port = config['port']
         dbname = config['database']
-        engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
-        df.to_sql(table_name, engine, index=False, if_exists='append')
+        return create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
+
+    def execute_df_save(self, df, table_name):
+        df.to_sql(table_name, self.get_engine(), index=False, if_exists='append')
+
+    def read_sql_to_df(self, sql):
+        df = pd.read_sql_query(sql, self.get_engine())
+        return df

+ 1 - 1
utils/df_utils/util.py

@@ -44,7 +44,7 @@ def calculate_time_difference(now: datetime.datetime, date: datetime.datetime):
 
 
 if __name__ == '__main__':
-    df = pd.read_csv(r"D:\trans_data\01.csv")
+    df = pd.read_csv(r"D:\data\清理数据\密马风电场\test_11_test\minute\WOG00469.csv")
     df['time_stamp'] = pd.to_datetime(df['time_stamp'])
     space = get_time_space(df, 'time_stamp')
     min = df['time_stamp'].min()

+ 94 - 62
utils/file/trans_methods.py

@@ -1,9 +1,9 @@
 # -*- coding: utf-8 -*-
 # @Time    : 2024/5/16
 # @Author  : 魏志亮
+import ast
 import datetime
 import os
-import re
 import shutil
 import warnings
 
@@ -47,83 +47,94 @@ def split_array(array, num):
 
 
 def find_read_header(file_path, trans_cols):
-    print(trans_cols)
     df = read_file_to_df(file_path, nrows=20)
     df.reset_index(inplace=True)
     count = 0
+    header = None
     for col in trans_cols:
         if col in df.columns:
             count = count + 1
             if count >= 2:
-                return 0
+                header = 0
+                break
 
     count = 0
+
     values = list()
     for index, row in df.iterrows():
-        values = list(row.values)
-        if type(row.name) == tuple:
-            values.extend(list(row.name))
         for col in trans_cols:
-            if col in values:
+            if col in row.values:
                 count = count + 1
-                if count >= 2:
-                    return index + 1
-                    
+                if count > 2:
+                    header = index + 1
+                    break
 
+    read_cols = []
+    for col in values:
+        if col in trans_cols:
+            read_cols.append(col)
 
-    return None
+    return header, read_cols
 
 
 # 读取数据到df
-def read_file_to_df(file_path, read_cols=list(), header=0, trans_cols=None, nrows=None):
+def read_file_to_df(file_path, read_cols=list(), trans_cols=None, nrows=None, not_find_header='raise'):
     begin = datetime.datetime.now()
     trans_print('开始读取文件', file_path)
+    header = 0
+    find_cols = list()
     if trans_cols:
-        header = find_read_header(file_path, trans_cols)
+        header, find_cols = find_read_header(file_path, trans_cols)
         trans_print(os.path.basename(file_path), "读取第", header, "行")
         if header is None:
-            message = '未匹配到开始行,请检查并重新指定'
-            trans_print(message)
-            raise Exception(message)
-
-    try:
-        df = pd.DataFrame()
-        if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
-            encoding = detect_file_encoding(file_path)
-            end_with_gz = str(file_path).lower().endswith("gz")
-            if read_cols:
-                if end_with_gz:
-                    df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header,
-                                     nrows=nrows)
-                else:
-                    df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header,
-                                     on_bad_lines='warn', nrows=nrows)
-            else:
-
-                if end_with_gz:
-                    df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header, nrows=nrows)
-                else:
-                    df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn', nrows=nrows)
-
-        else:
-            xls = pd.ExcelFile(file_path)
-            # 获取所有的sheet名称
-            sheet_names = xls.sheet_names
-            for sheet_name in sheet_names:
+            if not_find_header == 'raise':
+                message = '未匹配到开始行,请检查并重新指定'
+                trans_print(message)
+                raise Exception(message)
+            elif not_find_header == 'ignore':
+                pass
+
+    read_cols.extend(find_cols)
+    df = pd.DataFrame()
+    if header is not None:
+        try:
+            if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
+                encoding = detect_file_encoding(file_path)
+                end_with_gz = str(file_path).lower().endswith("gz")
                 if read_cols:
-                    now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, usecols=read_cols, nrows=nrows)
+                    if end_with_gz:
+                        df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip',
+                                         header=header,
+                                         nrows=nrows)
+                    else:
+                        df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header,
+                                         on_bad_lines='warn', nrows=nrows)
                 else:
-                    now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, nrows=nrows)
-
-                now_df['sheet_name'] = sheet_name
 
-                df = pd.concat([df, now_df])
+                    if end_with_gz:
+                        df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header, nrows=nrows)
+                    else:
+                        df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn', nrows=nrows)
 
-        trans_print('文件读取成功', file_path, '文件数量', df.shape, '耗时', datetime.datetime.now() - begin)
-    except Exception as e:
-        trans_print('读取文件出错', file_path, str(e))
-        message = '文件:' + os.path.basename(file_path) + ',' + str(e)
-        raise ValueError(message)
+            else:
+                xls = pd.ExcelFile(file_path)
+                # 获取所有的sheet名称
+                sheet_names = xls.sheet_names
+                for sheet_name in sheet_names:
+                    if read_cols:
+                        now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, usecols=read_cols,
+                                               nrows=nrows)
+                    else:
+                        now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, nrows=nrows)
+
+                    now_df['sheet_name'] = sheet_name
+                    df = pd.concat([df, now_df])
+                xls.close()
+            trans_print('文件读取成功:', file_path, '数据数量:', df.shape, '耗时:', datetime.datetime.now() - begin)
+        except Exception as e:
+            trans_print('读取文件出错', file_path, str(e))
+            message = '文件:' + os.path.basename(file_path) + ',' + str(e)
+            raise ValueError(message)
 
     return df
 
@@ -147,6 +158,9 @@ def __build_directory_dict(directory_dict, path, filter_types=None):
 
 # 读取路径下所有的excel文件
 def read_excel_files(read_path):
+    if os.path.isfile(read_path):
+        return [read_path]
+
     directory_dict = {}
     __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
 
@@ -155,6 +169,8 @@ def read_excel_files(read_path):
 
 # 读取路径下所有的文件
 def read_files(read_path):
+    if os.path.isfile(read_path):
+        return [read_path]
     directory_dict = {}
     __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz', 'zip', 'rar'])
 
@@ -173,6 +189,11 @@ def copy_to_new(from_path, to_path):
 
 # 创建路径
 def create_file_path(path, is_file_path=False):
+    """
+    创建路径
+    :param path:创建文件夹的路径
+    :param is_file_path: 传入的path是否包含具体的文件名
+    """
     if is_file_path:
         path = os.path.dirname(path)
 
@@ -180,17 +201,28 @@ def create_file_path(path, is_file_path=False):
         os.makedirs(path, exist_ok=True)
 
 
-# 格式化风机名称
-def generate_turbine_name(turbine_name='F0001', prefix='F'):
-    strinfo = re.compile(r"[\D*]")
-    name = strinfo.sub('', str(turbine_name))
-    return prefix + str(int(name)).zfill(3)
+def valid_eval(eval_str):
+    """
+    验证 eval 是否包含非法的参数
+    """
+    safe_param = ["column", "wind_name", "df", "error_time", "str", "int"]
+    eval_str_names = [node.id for node in ast.walk(ast.parse(eval_str)) if isinstance(node, ast.Name)]
+    if not set(eval_str_names).issubset(safe_param):
+        raise NameError(
+            eval_str + " contains unsafe name :" + str(','.join(list(set(eval_str_names) - set(safe_param)))))
+    return True
 
 
 if __name__ == '__main__':
-    # files = read_excel_files(r'D:\trans_data\10.xls')
-    # for file in files:
-    file = r'D:\trans_data\新艾里风电场10号风机.csv'
-    read_file_to_df(file, trans_cols=
-    ['', '风向', '时间', '设备号', '机舱方向总角度', '$folder[2]', '发电机转速30秒平均值', '机组运行模式', '机舱旋转角度', '主轴转速', '变桨角度30秒平均值', '记录时间',
-     '发电机功率30秒平均值', '风速30秒平均值'])
+    # aa = valid_eval("column[column.find('_')+1:]")
+    # print(aa)
+    #
+    # aa = valid_eval("df['123'].apply(lambda wind_name: wind_name.replace('元宝山','').replace('号风机',''))")
+    # print(aa)
+    #
+    # aa = valid_eval("'记录时间' if column == '时间' else column;import os; os.path")
+    # print(aa)
+
+    df = read_file_to_df(r"D:\data\11-12月.xls", trans_cols=['风机', '时间', '有功功率', '无功功率', '功率因数', '频率'], nrows=30)
+
+    print(df.columns)

+ 2 - 4
utils/log/trans_log.py

@@ -1,4 +1,3 @@
-
 # -*- coding: utf-8 -*-
 # @Time    : 2024/5/16
 # @Author  : 魏志亮
@@ -36,11 +35,11 @@ stout_handle.setLevel(logging.INFO)
 stout_handle.addFilter(ContextFilter())
 logger.addHandler(stout_handle)
 
-config = yaml_conf(os.environ['ETL_CONF'])
+config_path = os.path.abspath(__file__).split("utils")[0] + 'conf' + os.sep + 'etl_config_dev.yaml'
+config = yaml_conf(os.environ.get('ETL_CONF', config_path))
 log_path_dir = read_conf(config, 'log_path_dir', "/data/logs")
 
 log_path = log_path_dir + os.sep + r'etl_tools_' + (os.environ['env'] if 'env' in os.environ else 'dev')
-
 file_path = os.path.join(log_path)
 
 if not os.path.exists(file_path):
@@ -57,4 +56,3 @@ logger.addHandler(file_handler)
 
 def trans_print(*args):
     logger.info("  ".join([str(a) for a in args]))
-

+ 4 - 0
utils/systeminfo/sysinfo.py

@@ -64,6 +64,10 @@ def use_files_get_max_cpu_count(file_paths: list[str], memory_percent: float = 1
     result = count if count <= max_cpu_count else max_cpu_count
     if result == 0:
         result = 1
+
+    if result > len(file_paths):
+        result = len(file_paths)
+
     trans_print("总文件数:", len(file_paths), ",获取最大文件大小:", str(round(max_file_size / 2 ** 20, 2)) + "M",
                 "可用内存:", str(get_available_memory_with_percent(1) / 2 ** 20) + "M",
                 "总CPU数:", get_cpu_count(), "CPU使用比例:", round(cpu_percent, 2), "CPU可用数量:", max_cpu_count,