Forráskód Böngészése

添加故障报警功能

wzl 8 hónapja
szülő
commit
f2c8855a75

+ 6 - 1
.gitignore

@@ -3,4 +3,9 @@ logs
 *.iml
 .idea
 test
-tmp
+tmp
+venv
+wheels
+build
+dist
+etl_tool.spec

+ 5 - 3
app_run.py

@@ -4,9 +4,6 @@
 import os
 import sys
 
-from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
-from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
-
 
 def get_exec_data(batch_no=None, read_type=None, run_count=1):
     if batch_no and read_type:
@@ -65,5 +62,10 @@ if __name__ == '__main__':
 
     from utils.log.trans_log import trans_print
     from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type
+    from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
+    from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
+
+    trans_print("所有请求参数:", sys.argv)
+    trans_print("配置文件路径:", os.environ.get("ETL_CONF"))
 
     run(run_count=run_count)

+ 5 - 1
conf/etl_config.yaml

@@ -45,4 +45,8 @@ etl_origin_path_contain: 收资数据
 save_path:
 
 # 日志保存路径
-log_path_dir: /home/wzl/logs
+log_path_dir: /data/logs
+
+# 临时文件存放处,有些甲方公司隔得tmp太小,只好自己配置
+tmp_base_path: /tmp
+

+ 4 - 5
etl/common/BaseDataTrans.py

@@ -1,14 +1,13 @@
 import datetime
-import sys
 import traceback
 
-from etl.common.PathsAndTable import PathsAndTable
 from etl.common.ClearData import ClearData
+from etl.common.PathsAndTable import PathsAndTable
 from etl.common.SaveToDb import SaveToDb
 from etl.common.UnzipAndRemove import UnzipAndRemove
-from service.plt_service import get_all_wind, get_batch_exec_data, get_data_by_batch_no_and_type, \
-    update_trans_status_success, update_trans_status_error, update_trans_status_running
-from tmp_file.power_derating_for_chunlin import read_excel_files
+from service.plt_service import get_all_wind, update_trans_status_success, update_trans_status_error, \
+    update_trans_status_running
+from utils.file.trans_methods import read_excel_files
 from utils.log.trans_log import trans_print, set_trance_id
 
 

+ 9 - 8
etl/common/PathsAndTable.py

@@ -1,12 +1,9 @@
 import os
 import shutil
-import tempfile
-
-import yaml
 
 from service.trans_service import drop_table, creat_min_sec_table, create_warn_fault_table
-from utils.log.trans_log import trans_print
 from utils.conf.read_conf import *
+from utils.log.trans_log import trans_print
 
 
 class PathsAndTable(object):
@@ -20,10 +17,12 @@ class PathsAndTable(object):
         self.save_db = save_db
         self.save_zip = save_zip
         self.multi_pool_count = 6
+        self.is_delete_db = False
 
-        yaml_config = yaml_conf(r"/data/config/etl_config.yaml")
+        yaml_config = yaml_conf(os.environ.get('ETL_CONF', r"/data/config/etl_config.yaml"))
 
         save_path_conf = read_conf(yaml_config, "save_path")
+        self.tmp_base_path = read_conf(yaml_config, "tmp_base_path", "/tmp")
         if save_path_conf:
             self.save_path = save_path_conf + os.sep + self.field_name
         else:
@@ -39,7 +38,7 @@ class PathsAndTable(object):
         return os.path.join(self.save_path, self.batch_no + "_" + self.batch_name, self.read_type)
 
     def get_tmp_path(self):
-        return os.path.join(tempfile.gettempdir(), self.field_name, self.batch_no + "_" + self.batch_name,
+        return os.path.join(self.tmp_base_path, self.field_name, self.batch_no + "_" + self.batch_name,
                             self.read_type)
 
     def get_excel_tmp_path(self):
@@ -72,8 +71,10 @@ class PathsAndTable(object):
     def delete_batch_db(self):
         if self.save_db:
             trans_print("开始删除表")
-            table_name = self.get_table_name()
-            drop_table(table_name, self.save_db)
+            if not self.is_delete_db:
+                table_name = self.get_table_name()
+                drop_table(table_name, self.save_db)
+                self.is_delete_db = True
             trans_print("删除表结束")
 
     def create_batch_db(self, wind_names: list = list()):

+ 2 - 3
etl/common/SaveToDb.py

@@ -1,7 +1,6 @@
 import multiprocessing
-import os
 import traceback
-
+from os import path
 from etl.common.PathsAndTable import PathsAndTable
 from service.plt_service import update_trans_transfer_progress
 from service.trans_service import save_file_to_db
@@ -21,7 +20,7 @@ class SaveToDb(object):
 
         self.pathsAndTable.delete_batch_db()
         all_saved_files = read_excel_files(self.pathsAndTable.get_save_path())
-        wind_names = [str(os.path.basename(i)).replace(".csv", "") for i in all_saved_files]
+        wind_names = [str(path.basename(i)).replace(".csv", "") for i in all_saved_files]
 
         self.pathsAndTable.create_batch_db(wind_names)
 

+ 19 - 8
etl/wind_power/fault_warn/FaultWarnTrans.py

@@ -7,7 +7,7 @@ from etl.common.BaseDataTrans import BaseDataTrans
 from service.plt_service import update_trans_status_error
 from service.trans_service import get_fault_warn_conf
 from utils.conf.read_conf import read_conf
-from utils.file.trans_methods import read_excel_files, read_file_to_df, create_file_path
+from utils.file.trans_methods import read_excel_files, read_file_to_df, create_file_path, valid_eval
 from utils.log.trans_log import trans_print
 
 
@@ -65,17 +65,25 @@ class FaultWarnTrans(BaseDataTrans):
 
             df.rename(columns=trans_map, inplace=True)
 
+            for col in df.columns:
+                if 'field_' + col not in read_fields_keys:
+                    del df[col]
+
             if time_format:
-                df['begin_time'] = pd.to_datetime(df['begin_time'])
-                df['end_time'] = pd.to_datetime(df['end_time'])
-            else:
-                df['begin_time'] = pd.to_datetime(df['begin_time'], format=time_format)
-                df['end_time'] = pd.to_datetime(df['end_time'], format=time_format)
+                if valid_eval(time_format):
+                    eval_str = f"df['begin_time'].apply(lambda error_time: {time_format} )"
+                    df['begin_time'] = eval(eval_str)
+                    eval_str = f"df['end_time'].apply(lambda error_time: {time_format} )"
+                    df['end_time'] = eval(eval_str)
+
+            df['begin_time'] = pd.to_datetime(df['begin_time'], errors='coerce')
+            df['end_time'] = pd.to_datetime(df['end_time'], errors='coerce')
 
             exec_wind_turbine_number = read_conf(conf_map, 'exec_wind_turbine_number')
             if exec_wind_turbine_number:
-                exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {exec_wind_turbine_number} )"
-                df['wind_turbine_number'] = eval(exec_str)
+                if valid_eval(exec_wind_turbine_number):
+                    exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {exec_wind_turbine_number} )"
+                    df['wind_turbine_number'] = eval(exec_str)
 
             for field, select_str in select_fields:
                 use_in = True
@@ -101,6 +109,9 @@ class FaultWarnTrans(BaseDataTrans):
             df['time_diff'] = (df['end_time'] - df['begin_time']).dt.total_seconds()
             df.loc[df['time_diff'] < 0, 'time_diff'] = np.nan
 
+            # 根绝开始时间进行排序
+            df.sort_values(by=['wind_turbine_number', 'begin_time', 'end_time'], inplace=True)
+
             if self.save_zip:
                 save_path = os.path.join(self.pathsAndTable.get_save_path(), str(self.batch_name) + '.csv.gz')
             else:

+ 11 - 8
etl/wind_power/min_sec/ReadAndSaveTmp.py

@@ -9,7 +9,7 @@ from etl.wind_power.min_sec import TransParam
 from etl.common.PathsAndTable import PathsAndTable
 from service.plt_service import update_trans_transfer_progress
 from utils.file.trans_methods import read_excel_files, split_array, del_blank, \
-    create_file_path, read_file_to_df
+    create_file_path, read_file_to_df, valid_eval
 from utils.log.trans_log import trans_print
 from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, get_dir_size, max_file_size_get_max_cpu_count
 
@@ -45,8 +45,9 @@ class ReadAndSaveTmp(object):
         df = self.read_excel_to_df(file_path)
 
         if self.trans_param.wind_name_exec:
-            exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
-            df['wind_turbine_number'] = eval(exec_str)
+            if valid_eval(self.trans_param.wind_name_exec):
+                exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
+                df['wind_turbine_number'] = eval(exec_str)
 
         names = set(df['wind_turbine_number'].values)
         cols = list(df.columns)
@@ -106,8 +107,9 @@ class ReadAndSaveTmp(object):
         df = del_blank(df, ['wind_turbine_number'])
         df = df[df['time_stamp'].isna() == False]
         if self.trans_param.wind_name_exec and not self.trans_param.merge_columns:
-            exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
-            df['wind_turbine_number'] = eval(exec_str)
+            if valid_eval(self.trans_param.wind_name_exec):
+                exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
+                df['wind_turbine_number'] = eval(exec_str)
 
         # 删除 有功功率 和 风速均为空的情况
         df.dropna(subset=['active_power', 'wind_velocity'], how='all', inplace=True)
@@ -238,9 +240,10 @@ class ReadAndSaveTmp(object):
             # 处理列名前缀问题
             if self.trans_param.resolve_col_prefix:
                 columns_dict = dict()
-                for column in df.columns:
-                    columns_dict[column] = eval(self.trans_param.resolve_col_prefix)
-                df.rename(columns=columns_dict, inplace=True)
+                if valid_eval(self.trans_param.resolve_col_prefix):
+                    for column in df.columns:
+                        columns_dict[column] = eval(self.trans_param.resolve_col_prefix)
+                    df.rename(columns=columns_dict, inplace=True)
 
             if self.trans_param.merge_columns:
                 select_cols = [self.trans_param.cols_tran['wind_turbine_number'],

+ 2 - 1
package.sh

@@ -1,2 +1,3 @@
-pyinstaller -F -n etl_tool app_run.py
+pyinstaller --clean -F -n etl_tool app_run.py
+
 #python -m nuitka --onefile --remove-output app_run.py

+ 17 - 1
requirements.txt

@@ -1,10 +1,26 @@
 chardet==5.2.0
+contourpy==1.3.0
+cycler==0.12.1
 DBUtils==3.1.0
+fonttools==4.53.1
+greenlet==3.0.3
+importlib_resources==6.4.5
+kiwisolver==1.4.7
+matplotlib==3.9.2
 numpy==2.0.0
+packaging==24.1
 pandas==2.2.2
+pillow==10.4.0
 psutil==6.0.0
 PyMySQL==1.1.0
-PyYAML==6.0.1
+pyparsing==3.1.4
+python-calamine==0.2.3
+python-dateutil==2.9.0.post0
+pytz==2024.1
 PyYAML==6.0.1
 rarfile==4.2
+six==1.16.0
 SQLAlchemy==2.0.30
+typing_extensions==4.12.2
+tzdata==2024.1
+zipp==3.20.1

+ 1 - 0
service/trans_service.py

@@ -163,6 +163,7 @@ def create_warn_fault_table(table_name):
     sql = f"""
     CREATE TABLE `{table_name}` (
       `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号',
+      `wind_turbine_name` varchar(20) DEFAULT NULL COMMENT '原始风机编号',
       `begin_time` datetime DEFAULT NULL COMMENT '开始时间',
       `end_time` datetime DEFAULT NULL COMMENT '结束时间',
       `time_diff` int DEFAULT NULL COMMENT '处理耗时,单位秒',

+ 4 - 3
test_run_local.py

@@ -41,7 +41,7 @@ if __name__ == '__main__':
     if len(sys.argv) >= 2:
         env = sys.argv[1]
     else:
-        env = 'dev'
+        env = 'prod'
     print(sys.argv)
     if env is None:
         raise Exception("请配置运行环境")
@@ -67,8 +67,9 @@ if __name__ == '__main__':
     data['batch_code'] = "test"
     data['batch_name'] = "test"
     data['transfer_type'] = "fault"
-    data['transfer_addr'] = r"D:\报警\唐珍风电2023年报警信息.xlsx"
+    # data['transfer_addr'] = r"D:\报警\唐珍风电2023年报警信息.xlsx"
+    data['transfer_addr'] = r"D:\故障\故障数据\故障记录_20230101_20240101.csv"
     data['field_code'] = "test"
-    data['field_name'] = "test"
+    data['field_name'] = "唐珍风电故障"
 
     run(data=data, save_db=False)

+ 55 - 0
tmp_file/zibo_guzhang_select_time.py

@@ -0,0 +1,55 @@
+from datetime import datetime, timedelta
+
+from utils.file.trans_methods import *
+
+
+def convert_and_calculate_time_range(time_str):
+    # 解析原始字符串
+    date_part = time_str[:6]
+    time_part = time_str[7:]
+
+    # 将短日期格式转换为完整年份
+    year = '20' + date_part[:2]
+    month = date_part[2:4]
+    day = date_part[4:]
+
+    hour = time_part[:2]
+    minute = time_part[2:]
+
+    # 创建 datetime 对象
+    base_time = datetime.datetime.strptime(f"{year}-{month}-{day} {hour}:{minute}", "%Y-%m-%d %H:%M")
+
+    # 计算时间区间
+    start_time = base_time.replace(second=0, microsecond=0) - timedelta(minutes=2)
+    end_time = base_time.replace(second=0, microsecond=0) + timedelta(minutes=3)
+
+    return base_time.strftime("%Y-%m-%d %H:%M"), start_time.strftime("%Y-%m-%d %H:%M:%S"), end_time.strftime(
+        "%Y-%m-%d %H:%M:%S")
+
+
+all_df = read_file_to_df(r"D:\data\淄博\故障记录_filtered.csv")
+all_df['激活时间'] = pd.to_datetime(all_df['激活时间'])
+
+all_files = read_excel_files(r"D:\data\淄博\淄博风场buffer文件(1)")
+
+dfs = pd.DataFrame()
+
+for file in all_files:
+    base_name = os.path.basename(file)
+    if base_name.startswith("b"):
+        try:
+            turbnine_no = int(base_name.split("_")[0].replace("b", ""))
+            base_time, start_time, end_time = convert_and_calculate_time_range(
+                base_name.replace(base_name.split("_")[0] + "_", "")[0:11])
+        except Exception as e:
+            print("error:", file)
+            raise e
+
+        condation1 = (all_df['激活时间'] >= start_time) & (all_df['风机名'] == turbnine_no)
+        condation2 = (all_df['激活时间'] < end_time) & (all_df['风机名'] == turbnine_no)
+        condation = condation1 & condation2
+        dfs = pd.concat([dfs, all_df[condation]])
+
+dfs.drop_duplicates(inplace=True)
+
+dfs.to_csv(r"D:\data\淄博\result.csv", encoding='utf8', index=False)

+ 1 - 1
utils/df_utils/util.py

@@ -44,7 +44,7 @@ def calculate_time_difference(now: datetime.datetime, date: datetime.datetime):
 
 
 if __name__ == '__main__':
-    df = pd.read_csv(r"D:\trans_data\01.csv")
+    df = pd.read_csv(r"D:\data\清理数据\密马风电场\test_11_test\minute\WOG00469.csv")
     df['time_stamp'] = pd.to_datetime(df['time_stamp'])
     space = get_time_space(df, 'time_stamp')
     min = df['time_stamp'].min()

+ 26 - 7
utils/file/trans_methods.py

@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # @Time    : 2024/5/16
 # @Author  : 魏志亮
+import ast
 import datetime
 import os
 import shutil
@@ -163,6 +164,8 @@ def read_excel_files(read_path):
 
 # 读取路径下所有的文件
 def read_files(read_path):
+    if os.path.isfile(read_path):
+        return [read_path]
     directory_dict = {}
     __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz', 'zip', 'rar'])
 
@@ -181,6 +184,11 @@ def copy_to_new(from_path, to_path):
 
 # 创建路径
 def create_file_path(path, is_file_path=False):
+    """
+    创建路径
+    :param path:创建文件夹的路径
+    :param is_file_path: 传入的path是否包含具体的文件名
+    """
     if is_file_path:
         path = os.path.dirname(path)
 
@@ -188,13 +196,24 @@ def create_file_path(path, is_file_path=False):
         os.makedirs(path, exist_ok=True)
 
 
+def valid_eval(eval_str):
+    """
+    验证 eval 是否包含非法的参数
+    """
+    safe_param = ["column", "wind_name", "df", "error_time", "str"]
+    eval_str_names = [node.id for node in ast.walk(ast.parse(eval_str)) if isinstance(node, ast.Name)]
+    if not set(eval_str_names).issubset(safe_param):
+        raise NameError(
+            eval_str + " contains unsafe name :" + str(','.join(list(set(eval_str_names) - set(safe_param)))))
+    return True
+
+
 if __name__ == '__main__':
-    datas = read_excel_files(r"D:\data\清理数据\招远风电场\WOF053600062-WOB000009_ZYFDC000012\minute")
-    for data in datas:
-        print(data)
+    aa = valid_eval("column[column.find('_')+1:]")
+    print(aa)
 
-    print("*" * 20)
+    aa = valid_eval("df['123'].apply(lambda wind_name: wind_name.replace('元宝山','').replace('号风机',''))")
+    print(aa)
 
-    datas = read_excel_files(r"D:\data\清理数据\招远风电场\WOF053600062-WOB000009_ZYFDC000012\minute\WOG00066.csv.gz")
-    for data in datas:
-        print(data)
+    aa = valid_eval("'记录时间' if column == '时间' else column;import os; os.path")
+    print(aa)

+ 1 - 1
utils/log/trans_log.py

@@ -35,7 +35,7 @@ stout_handle.setLevel(logging.INFO)
 stout_handle.addFilter(ContextFilter())
 logger.addHandler(stout_handle)
 
-config = yaml_conf(os.environ['ETL_CONF'])
+config = yaml_conf(os.environ.get('ETL_CONF', r"/data/config/etl_config.yaml"))
 log_path_dir = read_conf(config, 'log_path_dir', "/data/logs")
 
 log_path = log_path_dir + os.sep + r'etl_tools_' + (os.environ['env'] if 'env' in os.environ else 'dev')

+ 4 - 0
utils/systeminfo/sysinfo.py

@@ -64,6 +64,10 @@ def use_files_get_max_cpu_count(file_paths: list[str], memory_percent: float = 1
     result = count if count <= max_cpu_count else max_cpu_count
     if result == 0:
         result = 1
+
+    if result > len(file_paths):
+        result = len(file_paths)
+
     trans_print("总文件数:", len(file_paths), ",获取最大文件大小:", str(round(max_file_size / 2 ** 20, 2)) + "M",
                 "可用内存:", str(get_available_memory_with_percent(1) / 2 ** 20) + "M",
                 "总CPU数:", get_cpu_count(), "CPU使用比例:", round(cpu_percent, 2), "CPU可用数量:", max_cpu_count,