소스 검색

修改合并表逻辑

wzl 10 달 전
부모
커밋
5c844f986c

+ 9 - 17
app_run.py

@@ -4,6 +4,8 @@
 import os
 import sys
 
+from utils.conf.read_conf import yaml_conf, read_conf
+
 
 def get_exec_data(batch_no=None, read_type=None, run_count=1):
     if batch_no and read_type:
@@ -39,33 +41,23 @@ def run(batch_no=None, read_type=None, save_db=True, run_count=1):
 
 
 if __name__ == '__main__':
-    env = None
-    if len(sys.argv) >= 2:
-        env = sys.argv[1]
-    else:
-        env = 'dev'
-    print(sys.argv)
-    if env is None:
-        raise Exception("请配置运行环境")
-
-    os.environ['env'] = env
-
-    run_count = 1
-    if len(sys.argv) >= 3:
-        run_count = int(sys.argv[2])
 
     conf_path = '/data/config/etl_config.yaml'
-    if len(sys.argv) >= 4:
-        conf_path = sys.argv[3]
+    if len(sys.argv) >= 2:
+        conf_path = sys.argv[1]
 
     os.environ['ETL_CONF'] = conf_path
+    yaml_config = yaml_conf(conf_path)
+    env = read_conf(yaml_config, "env", "dev")
+    os.environ['env'] = env
+    run_count = int(read_conf(yaml_config, "run_batch_count", 1))
 
     from utils.log.trans_log import trans_print
     from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type
     from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
     from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
 
-    trans_print("所有请求参数:", sys.argv)
+    trans_print("所有请求参数:", sys.argv, "env:", env, "最大可执行个数:", run_count)
     trans_print("配置文件路径:", os.environ.get("ETL_CONF"))
 
     run(run_count=run_count)

+ 3 - 0
conf/etl_config.yaml

@@ -50,3 +50,6 @@ log_path_dir: /data/logs
 # 临时文件存放处,有些甲方公司隔得tmp太小,只好自己配置
 tmp_base_path: /tmp
 
+run_batch_count: 2
+# 执行环境
+env: prod

+ 55 - 0
conf/etl_config_dev.yaml

@@ -0,0 +1,55 @@
+plt_connect_pool_config:
+  blocking: true
+  charset: utf8mb4
+  maxcached: 5
+  maxconnections: 20
+  maxshared: 0
+  mincached: 2
+  setsession: [ ]
+plt_dev:
+  database: energy
+  host: 192.168.50.233
+  password: admin123456
+  port: 3306
+  user: admin
+plt_prod:
+  database: energy_prod
+  host: 192.168.50.233
+  password: admin123456
+  port: 3306
+  user: admin
+trans_connect_pool_config:
+  blocking: true
+  charset: utf8
+  maxcached: 20
+  maxconnections: 20
+  maxshared: 0
+  mincached: 1
+  setsession: [ ]
+trans_dev:
+  database: energy_data
+  host: 192.168.50.235
+  password: admin123456
+  port: 30306
+  user: root
+trans_prod:
+  database: energy_data_prod
+  host: 192.168.50.235
+  password: admin123456
+  port: 30306
+  user: root
+
+# 如果要放在原始路径,则配置这个 以下面的名称作为切割点,新建清理数据文件夹
+etl_origin_path_contain: 收资数据
+# 如果单独保存,配置这个路径
+save_path:
+
+# 日志保存路径
+log_path_dir: /data/logs
+
+# 临时文件存放处,有些甲方公司隔得tmp太小,只好自己配置
+tmp_base_path: /tmp
+
+run_batch_count: 1
+# 执行环境
+env: dev

+ 1 - 0
etl/common/BaseDataTrans.py

@@ -104,6 +104,7 @@ class BaseDataTrans(object):
         except Exception as e:
             trans_print(traceback.format_exc())
             update_trans_status_error(self.batch_no, self.read_type, str(e), self.save_db)
+            raise e
         finally:
             self.pathsAndTable.delete_tmp_files()
             trans_print("执行结束,总耗时:", str(datetime.datetime.now() - total_begin))

+ 0 - 2
etl/wind_power/min_sec/ReadAndSaveTmp.py

@@ -111,8 +111,6 @@ class ReadAndSaveTmp(object):
                 exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
                 df['wind_turbine_number'] = eval(exec_str)
 
-        # 删除 有功功率 和 风速均为空的情况
-        df.dropna(subset=['active_power', 'wind_velocity'], how='all', inplace=True)
 
         self.save_to_tmp_csv(df)
 

+ 6 - 0
etl/wind_power/min_sec/StatisticsAndSaveFile.py

@@ -75,6 +75,9 @@ class StatisticsAndSaveFile(object):
         not_double_cols = ['wind_turbine_number', 'wind_turbine_name', 'time_stamp', 'param6', 'param7', 'param8',
                            'param9', 'param10']
 
+        # 删除 有功功率 和 风速均为空的情况
+        df.dropna(subset=['active_power', 'wind_velocity'], how='all', inplace=True)
+
         df.replace(np.nan, -999999999, inplace=True)
         number_cols = df.select_dtypes(include=['number']).columns.tolist()
         for col in df.columns:
@@ -96,6 +99,9 @@ class StatisticsAndSaveFile(object):
         df.sort_values(by='time_stamp', inplace=True)
         df = df[[i for i in self.trans_param.cols_tran.keys() if i in df.columns]]
 
+        ## 做数据检测前,羡强行处理有功功率
+        df = df[df['active_power'] < 5000]
+
         rated_power_and_cutout_speed_tuple = read_conf(self.rated_power_and_cutout_speed_map, str(wind_col_name))
         if rated_power_and_cutout_speed_tuple is None:
             rated_power_and_cutout_speed_tuple = (None, None)

+ 1 - 2
service/trans_service.py

@@ -1,7 +1,6 @@
 # -*- coding: utf-8 -*-
 # @Time    : 2024/6/7
 # @Author  : 魏志亮
-import datetime
 import os
 import traceback
 
@@ -14,7 +13,7 @@ trans = ConnectMysql("trans")
 
 
 def get_min_sec_conf(field_code, trans_type) -> dict:
-    query_sql = "SELECT * FROM trans_conf where wind_code = %s and type = %s"
+    query_sql = "SELECT * FROM trans_conf where wind_code = %s and type = %s and status = 1"
     res = trans.execute(query_sql, (field_code, trans_type))
     print(res)
     if type(res) == tuple:

+ 45 - 26
test_run_local.py

@@ -1,8 +1,14 @@
 # -*- coding: utf-8 -*-
 # @Time    : 2024/6/11
 # @Author  : 魏志亮
+import datetime
 import os
 import sys
+import traceback
+
+import pandas as pd
+
+from utils.conf.read_conf import yaml_conf, read_conf
 
 
 def get_exec_data(batch_no=None, read_type=None, run_count=1):
@@ -37,39 +43,52 @@ def run(data: dict = dict(), save_db=False):
 
 
 if __name__ == '__main__':
-    env = None
+    conf_path = r'conf/etl_config.yaml'
     if len(sys.argv) >= 2:
-        env = sys.argv[1]
-    else:
-        env = 'prod'
-    print(sys.argv)
-    if env is None:
-        raise Exception("请配置运行环境")
-
-    os.environ['env'] = env
-
-    run_count = 1
-    if len(sys.argv) >= 3:
-        run_count = int(sys.argv[2])
-
-    conf_path = '/data/config/etl_config.yaml'
-    if len(sys.argv) >= 4:
-        conf_path = sys.argv[3]
+        conf_path = sys.argv[1]
 
+    begin = datetime.datetime.now()
+    print("开始执行:", begin)
     os.environ['ETL_CONF'] = conf_path
+    yaml_config = yaml_conf(conf_path)
+    env = read_conf(yaml_config, "env", "dev")
+    os.environ['env'] = env
+    run_count = int(read_conf(yaml_config, "run_batch_count", 1))
 
     from utils.log.trans_log import trans_print
     from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type
     from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
     from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
+    from utils.file.trans_methods import read_file_to_df
 
+    df = read_file_to_df("tmp_file/rebuild_data.csv")
+    results = list()
     data = dict()
-    data['batch_code'] = "test"
-    data['batch_name'] = "test"
-    data['transfer_type'] = "fault"
-    # data['transfer_addr'] = r"D:\报警\唐珍风电2023年报警信息.xlsx"
-    data['transfer_addr'] = r"D:\故障\故障数据\故障记录_20230101_20240101.csv"
-    data['field_code'] = "test"
-    data['field_name'] = "唐珍风电故障"
-
-    run(data=data, save_db=False)
+    for batch_code, batch_name, transfer_type, transfer_addr, field_code, field_name \
+            in zip(df['batch_code'], df['batch_name'], df['transfer_type'], df['transfer_addr'], df['field_code'],
+                   df['field_name']):
+        batch_begin = datetime.datetime.now()
+        transfer_addr = transfer_addr.replace(r"/data/download/collection_data",
+                                              r"/data/download/datang_shangxian")
+        trans_print("开始执行批次:", batch_code, batch_name, transfer_type, field_code, field_name)
+        trans_print("批次路径:", transfer_addr)
+
+        data['batch_code'] = batch_code
+        data['batch_name'] = batch_name
+        data['transfer_type'] = transfer_type
+        data['transfer_addr'] = transfer_addr
+        data['field_code'] = field_code
+        data['field_name'] = field_name
+        try:
+            run(data=data, save_db=True)
+            results.append((batch_code, batch_name, transfer_type, field_code, field_name, 'success'))
+        except Exception as e:
+            results.append((batch_code, batch_name, transfer_type, field_code, field_name, 'error'))
+            trans_print(traceback.format_exc())
+        finally:
+            trans_print("执行结束,耗时:", datetime.datetime.now() - batch_begin, "总耗时:", datetime.datetime.now() - begin)
+
+    for data in results:
+        trans_print(data)
+
+    trans_print("执行结束,总耗时:", datetime.datetime.now() - begin)

+ 94 - 0
tmp_file/cp_online_data_to_other.py

@@ -0,0 +1,94 @@
+import datetime
+import multiprocessing
+import os
+import shutil
+
+not_move_dir = ["乌梅山风电场-江西-大唐",
+                "诺木洪风电场-甘肃-华电",
+                "平陆风电场-山西-中广核",
+                "泗洪协合风电场-安徽-深能南控",
+                "诺木洪风电场-青海-华电",
+                "长清风电场-山东-国电"
+                ]
+
+read_dir = r"/data/download/collection_data"
+# read_dir = r'Z:\collection_data'
+save_base_dir = r"/data/download/datang_shangxian"
+
+
+def __build_directory_dict(directory_dict, path, filter_types=None):
+    # 遍历目录下的所有项
+    for item in os.listdir(path):
+        if item not in not_move_dir:
+            item_path = os.path.join(path, item)
+            if os.path.isdir(item_path):
+                __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
+            elif os.path.isfile(item_path):
+                if path not in directory_dict:
+                    directory_dict[path] = []
+
+                if filter_types is None or len(filter_types) == 0:
+                    directory_dict[path].append(item_path)
+                elif str(item_path).split(".")[-1] in filter_types:
+                    if str(item_path).count("~$") == 0:
+                        directory_dict[path].append(item_path)
+
+
+# 读取路径下所有的excel文件
+def read_excel_files(read_path):
+    if os.path.isfile(read_path):
+        return [read_path]
+
+    directory_dict = {}
+    __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
+
+    return [path for paths in directory_dict.values() for path in paths if path]
+
+
+# 读取路径下所有的文件
+def read_files(read_path):
+    if os.path.isfile(read_path):
+        return [read_path]
+    directory_dict = {}
+    __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz', 'zip', 'rar'])
+
+    return [path for paths in directory_dict.values() for path in paths if path]
+
+
+# 创建路径
+def create_file_path(path, is_file_path=False):
+    """
+    创建路径
+    :param path:创建文件夹的路径
+    :param is_file_path: 传入的path是否包含具体的文件名
+    """
+    if is_file_path:
+        path = os.path.dirname(path)
+
+    if not os.path.exists(path):
+        os.makedirs(path, exist_ok=True)
+
+
+def copy_to_new(from_path):
+    to_path = from_path.replace(read_dir, save_base_dir)
+    is_file = False
+    if to_path.count('.') > 0:
+        is_file = True
+
+    create_file_path(to_path, is_file_path=is_file)
+
+    shutil.copy(from_path, to_path)
+
+
+print("开始:", datetime.datetime.now())
+begin = datetime.datetime.now()
+read_all_files = [i for i in read_files(read_dir) if i.find("收资数据") > -1]
+print(len(read_all_files))
+print("统计耗时:", datetime.datetime.now() - begin)
+cp_begin = datetime.datetime.now()
+
+with multiprocessing.Pool(40) as pool:
+    pool.starmap(copy_to_new, [(path,) for path in read_all_files])
+
+print(len(read_all_files), "耗时:", datetime.datetime.now() - cp_begin, "总耗时:", datetime.datetime.now() - begin)
+print("结束:", datetime.datetime.now())

+ 14 - 10
utils/file/trans_methods.py

@@ -113,7 +113,7 @@ def read_file_to_df(file_path, read_cols=list(), trans_cols=None, nrows=None):
                     df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn', nrows=nrows)
 
         else:
-            xls = pd.ExcelFile(file_path, engine="calamine")
+            xls = pd.ExcelFile(file_path)
             # 获取所有的sheet名称
             sheet_names = xls.sheet_names
             for sheet_name in sheet_names:
@@ -200,7 +200,7 @@ def valid_eval(eval_str):
     """
     验证 eval 是否包含非法的参数
     """
-    safe_param = ["column", "wind_name", "df", "error_time", "str"]
+    safe_param = ["column", "wind_name", "df", "error_time", "str", "int"]
     eval_str_names = [node.id for node in ast.walk(ast.parse(eval_str)) if isinstance(node, ast.Name)]
     if not set(eval_str_names).issubset(safe_param):
         raise NameError(
@@ -209,11 +209,15 @@ def valid_eval(eval_str):
 
 
 if __name__ == '__main__':
-    aa = valid_eval("column[column.find('_')+1:]")
-    print(aa)
-
-    aa = valid_eval("df['123'].apply(lambda wind_name: wind_name.replace('元宝山','').replace('号风机',''))")
-    print(aa)
-
-    aa = valid_eval("'记录时间' if column == '时间' else column;import os; os.path")
-    print(aa)
+    # aa = valid_eval("column[column.find('_')+1:]")
+    # print(aa)
+    #
+    # aa = valid_eval("df['123'].apply(lambda wind_name: wind_name.replace('元宝山','').replace('号风机',''))")
+    # print(aa)
+    #
+    # aa = valid_eval("'记录时间' if column == '时间' else column;import os; os.path")
+    # print(aa)
+
+    df = read_file_to_df(r"D:\data\11-12月.xls", trans_cols=['风机', '时间', '有功功率', '无功功率', '功率因数', '频率'], nrows=30)
+
+    print(df.columns)