Browse Source

去掉批次后第一次提交

wzl 4 tháng trước cách đây
mục cha
commit
8179859207

+ 1 - 2
etl/common/ArchiveFile.py

@@ -18,8 +18,7 @@ class ArchiveFile(object):
         """
         if os.path.exists(self.pathsAndTable.get_tmp_formal_path()):
             shutil.make_archive(self.pathsAndTable.get_archive_path(), 'zip', self.pathsAndTable.get_tmp_formal_path())
-            update_archive_success(self.exec_id, self.pathsAndTable.read_type,
-                                   f"{self.pathsAndTable.get_archive_path()}.zip")
+            update_archive_success(self.exec_id, f"{self.pathsAndTable.get_archive_path()}.zip")
             trans_print(f"文件夹已归档为 {self.pathsAndTable.get_archive_path()}.zip")
         else:
             trans_print(f"文件夹 {self.pathsAndTable.get_tmp_formal_path()} 不存在")

+ 4 - 4
etl/common/BaseDataTrans.py

@@ -38,7 +38,7 @@ class BaseDataTrans(object):
                                                self.yaml_config, self.wind_col_trans)
         except Exception as e:
             trans_print(traceback.format_exc())
-            update_trans_status_error(self.id, self.transfer_type, str(e), self.save_db)
+            update_trans_status_error(self.id, str(e), self.save_db)
             raise e
 
     def get_filed_conf(self):
@@ -79,7 +79,7 @@ class BaseDataTrans(object):
 
     # 最后更新执行程度
     def update_exec_progress(self):
-        update_trans_status_success(self.id, self.transfer_type,
+        update_trans_status_success(self.id,
                                     len(read_excel_files(self.pathsAndTable.get_save_path())),
                                     None, None, None, None, self.save_db)
 
@@ -88,7 +88,7 @@ class BaseDataTrans(object):
         try:
             trance_id = '-'.join([str(self.id), self.wind_farm_name, self.transfer_type])
             set_trance_id(trance_id)
-            update_trans_status_running(self.id, self.transfer_type, self.save_db)
+            update_trans_status_running(self.id, self.save_db)
 
             now_index = 0
             # 0
@@ -156,7 +156,7 @@ class BaseDataTrans(object):
             self.update_exec_progress()
         except Exception as e:
             trans_print(traceback.format_exc())
-            update_trans_status_error(self.id, self.transfer_type, str(e), self.save_db)
+            update_trans_status_error(self.id, str(e), self.save_db)
             raise e
         finally:
             self.pathsAndTable.delete_tmp_files()

+ 1 - 2
etl/common/ClearData.py

@@ -22,6 +22,5 @@ class ClearData(object):
         trans_print("开始清理数据,临时文件夹:", self.pathsAndTable.get_tmp_path())
         begin = datetime.datetime.now()
         self.clean_data()
-        update_trans_transfer_progress(self.pathsAndTable.id, self.pathsAndTable.read_type, 5,
-                                       self.pathsAndTable.save_db)
+        update_trans_transfer_progress(self.pathsAndTable.id, 5, self.pathsAndTable.save_db)
         trans_print("清理数据结束,耗时:", datetime.datetime.now() - begin)

+ 2 - 2
etl/common/SaveToDb.py

@@ -43,7 +43,7 @@ class SaveToDb(object):
                         pool.starmap(save_file_to_db,
                                      [(self.pathsAndTable.get_table_name(), file, self.batch_count) for file in arr])
 
-                update_trans_transfer_progress(self.pathsAndTable.id, self.pathsAndTable.read_type,
+                update_trans_transfer_progress(self.pathsAndTable.id,
                                                round(70 + 29 * (index + 1) / len(all_arrays), 2),
                                                self.pathsAndTable.save_db)
         except Exception as e:
@@ -54,5 +54,5 @@ class SaveToDb(object):
     def run(self):
         if self.pathsAndTable.save_db:
             self.mutiprocessing_to_save_db()
-            update_trans_transfer_progress(self.pathsAndTable.id, self.pathsAndTable.read_type, 99,
+            update_trans_transfer_progress(self.pathsAndTable.id,  99,
                                            self.pathsAndTable.save_db)

+ 2 - 2
etl/common/UnzipAndRemove.py

@@ -51,7 +51,7 @@ class UnzipAndRemove(object):
                 pool_count = split_count if split_count < len(arr) else len(arr)
                 with multiprocessing.Pool(pool_count) as pool:
                     pool.starmap(self.get_and_remove, [(i,) for i in arr])
-                update_trans_transfer_progress(self.pathsAndTable.id, self.pathsAndTable.read_type,
+                update_trans_transfer_progress(self.pathsAndTable.id,
                                                round(5 + 15 * (index + 1) / len(all_arrays), 2),
                                                self.pathsAndTable.save_db)
 
@@ -66,5 +66,5 @@ class UnzipAndRemove(object):
 
     def run(self):
         self.remove_file_to_tmp_path()
-        update_trans_transfer_progress(self.pathsAndTable.id, self.pathsAndTable.read_type, 20,
+        update_trans_transfer_progress(self.pathsAndTable.id,  20,
                                        self.pathsAndTable.save_db)

+ 3 - 3
etl/wind_power/fault_warn/FaultWarnTrans.py

@@ -6,7 +6,7 @@ import pandas as pd
 
 from etl.common.BaseDataTrans import BaseDataTrans
 from service.trans_conf_service import update_trans_status_error, update_trans_status_success
-from service.trans_service import get_fault_warn_conf, get_trans_exec_code, drop_table, create_warn_fault_table, \
+from service.trans_service import get_fault_warn_conf, drop_table, create_warn_fault_table, \
     save_file_to_db
 from utils.conf.read_conf import read_conf
 from utils.file.trans_methods import read_excel_files, read_file_to_df, create_file_path, valid_eval
@@ -35,7 +35,7 @@ class FaultWarnTrans(BaseDataTrans):
         if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
             message = f"未找到{self.id}的{self.transfer_type}配置"
             trans_print(message)
-            update_trans_status_error(self.id, self.transfer_type, message, self.save_db)
+            update_trans_status_error(self.id, message, self.save_db)
         else:
 
             for key, v in conf_map.items():
@@ -174,6 +174,6 @@ class FaultWarnTrans(BaseDataTrans):
         save_file_to_db(table_name, self.update_files[0], self.batch_count)
 
     def update_exec_progress(self):
-        update_trans_status_success(self.id, self.transfer_type,
+        update_trans_status_success(self.id,
                                     self.engine_count, None, self.min_date, self.max_date, self.data_count,
                                     self.save_db)

+ 2 - 2
etl/wind_power/laser/LaserTrans.py

@@ -67,9 +67,9 @@ class LaserTrans():
         update_trans_transfer_progress(self.id, 90)
         df.sort_values(by=['acquisition_time'], inplace=True)
         save_df_to_db(self.wind_farm_code + "_laser", df)
-        update_trans_status_success(self.id, 'laser', len(df['wind_turbine_number'].unique()), None,
+        update_trans_status_success(self.id, len(df['wind_turbine_number'].unique()), None,
                                     df['acquisition_time'].min(), df['acquisition_time'].max(), df.shape[0])
-        #update_trans_status_success(self.id)
+        # update_trans_status_success(self.id)
         trans_print(self.wind_farm_code, '执行结束,总耗时:', (datetime.datetime.now() - self.begin))
 
 

+ 2 - 3
etl/wind_power/min_sec/MinSecTrans.py

@@ -31,7 +31,7 @@ class MinSecTrans(BaseDataTrans):
         if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
             message = f"未找到{self.id}的{self.transfer_type}配置"
             trans_print(message)
-            update_trans_status_error(self.id, self.transfer_type, message, self.save_db)
+            update_trans_status_error(self.id, message, self.save_db)
         else:
             resolve_col_prefix = read_conf(conf_map, 'resolve_col_prefix')
             wind_name_exec = read_conf(conf_map, 'wind_name_exec', None)
@@ -100,8 +100,7 @@ class MinSecTrans(BaseDataTrans):
     # 最后更新执行程度
     def update_exec_progress(self):
         all_files = set([os.path.basename(i) for i in self.update_files])
-        update_trans_status_success(self.id, self.trans_param.read_type,
-                                    len(all_files),
+        update_trans_status_success(self.id, len(all_files),
                                     self.statistics_map['time_granularity'],
                                     self.statistics_map['min_date'], self.statistics_map['max_date'],
                                     self.statistics_map['total_count'], self.save_db)

+ 5 - 6
etl/wind_power/min_sec/ReadAndSaveTmp.py

@@ -1,4 +1,3 @@
-import base64
 import datetime
 import multiprocessing
 import traceback
@@ -12,7 +11,7 @@ from service.trans_conf_service import update_trans_transfer_progress
 from utils.file.trans_methods import read_excel_files, split_array, del_blank, \
     create_file_path, read_file_to_df, valid_eval
 from utils.log.trans_log import trans_print
-from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, get_dir_size, max_file_size_get_max_cpu_count
+from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, get_dir_size
 
 
 class ReadAndSaveTmp(object):
@@ -166,7 +165,7 @@ class ReadAndSaveTmp(object):
                     message = "整理临时文件,系统返回错误:" + str(e)
                     raise ValueError(message)
 
-                update_trans_transfer_progress(self.pathsAndTable.id, self.pathsAndTable.read_type,
+                update_trans_transfer_progress(self.pathsAndTable.id,
                                                round(20 + 20 * (index + 1) / len(all_arrays), 2),
                                                self.pathsAndTable.save_db)
 
@@ -186,7 +185,7 @@ class ReadAndSaveTmp(object):
                     message = "整理临时文件,系统返回错误:" + str(e)
                     raise ValueError(message)
 
-                update_trans_transfer_progress(self.pathsAndTable.id, self.pathsAndTable.read_type,
+                update_trans_transfer_progress(self.pathsAndTable.id,
                                                round(20 + 30 * (index + 1) / len(all_arrays), 2),
                                                self.pathsAndTable.save_db)
 
@@ -203,7 +202,7 @@ class ReadAndSaveTmp(object):
                     message = "整理临时文件,系统返回错误:" + str(e)
                     raise ValueError(message)
 
-                update_trans_transfer_progress(self.pathsAndTable.id, self.pathsAndTable.read_type,
+                update_trans_transfer_progress(self.pathsAndTable.id,
                                                round(20 + 30 * (index + 1) / len(all_arrays), 2),
                                                self.pathsAndTable.save_db)
 
@@ -361,6 +360,6 @@ class ReadAndSaveTmp(object):
         trans_print("开始保存数据到临时文件")
         begin = datetime.datetime.now()
         self.read_file_and_save_tmp()
-        update_trans_transfer_progress(self.pathsAndTable.id, self.pathsAndTable.read_type, 50,
+        update_trans_transfer_progress(self.pathsAndTable.id,  50,
                                        self.pathsAndTable.save_db)
         trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin)

+ 2 - 2
etl/wind_power/min_sec/StatisticsAndSaveTmpFormalFile.py

@@ -174,7 +174,7 @@ class StatisticsAndSaveTmpFormalFile(object):
             for index, arr in enumerate(all_arrays):
                 with multiprocessing.Pool(split_count) as pool:
                     pool.starmap(self.save_to_csv, [(i,) for i in arr])
-                update_trans_transfer_progress(self.paths_and_table.id, self.paths_and_table.read_type,
+                update_trans_transfer_progress(self.paths_and_table.id,
                                                round(50 + 15 * (index + 1) / len(all_arrays), 2),
                                                self.paths_and_table.save_db)
 
@@ -185,5 +185,5 @@ class StatisticsAndSaveTmpFormalFile(object):
 
     def run(self):
         self.mutiprocessing_to_save_file()
-        update_trans_transfer_progress(self.paths_and_table.id, self.paths_and_table.read_type, 65,
+        update_trans_transfer_progress(self.paths_and_table.id, 65,
                                        self.paths_and_table.save_db)

+ 1 - 1
etl/wind_power/wave/WaveTrans.py

@@ -119,7 +119,7 @@ class WaveTrans(object):
             trans_print(f"总共{total_index}组,当前{index + 1}", "本次写入耗时:", datetime.datetime.now() - index_begin,
                         "总耗时:", datetime.datetime.now() - self.begin)
 
-        update_trans_status_success(self.id, 'wave', len(wind_turbine_name_set), None,
+        update_trans_status_success(self.id, len(wind_turbine_name_set), None,
                                     self.min_date, self.max_date, self.data_count)
 
         # update_trans_status_success(self.id)

+ 0 - 1
service/plt_service.py

@@ -1,7 +1,6 @@
 # -*- coding: utf-8 -*-
 # @Time    : 2024/6/7
 # @Author  : 魏志亮
-import datetime
 
 from service.common_connect import plt
 

+ 19 - 18
service/trans_conf_service.py

@@ -17,38 +17,38 @@ def update_timeout_trans_data():
     trans.execute(sql)
 
 
-def update_trans_status_running(id, trans_type, save_db=True):
+def update_trans_status_running(id, save_db=True):
     if save_db:
         exec_sql = """
         update data_transfer set transfer_status = 0,trans_sys_status = 0 ,transfer_start_time = now(),err_info='',
         engine_count =0,time_granularity=0,transfer_finish_time=null,transfer_progress=0,
         data_min_time= null,data_max_time= null,transfer_data_count=null
-        where id = %s  and transfer_type = %s
+        where id = %s
         """
-        trans.execute(exec_sql, (id, trans_type))
+        trans.execute(exec_sql, id)
 
 
-def update_archive_success(id, trans_type, archive_path, save_db=True):
+def update_archive_success(id, archive_path, save_db=True):
     if save_db:
         exec_sql = """
         update data_transfer set transfer_progress=70,archive_path = %s
-        where id = %s  and transfer_type = %s
+        where id = %s  
         """
-        trans.execute(exec_sql, (archive_path, id, trans_type))
+        trans.execute(exec_sql, (archive_path, id))
 
 
-def update_trans_status_error(id, trans_type, message="", save_db=True):
+def update_trans_status_error(id, message="", save_db=True):
     if save_db:
         exec_sql = """
         update data_transfer set transfer_status = 2,trans_sys_status=2 ,err_info= %s,transfer_finish_time=now() 
-        where id = %s  and  transfer_type = %s
+        where id = %s  
         """
 
         message = message if len(message) <= 200 else message[0:200]
-        trans.execute(exec_sql, (message, id, trans_type))
+        trans.execute(exec_sql, (message, id))
 
 
-def update_trans_status_success(id, trans_type, wind_count=0, time_granularity=0,
+def update_trans_status_success(id, wind_count=0, time_granularity=0,
                                 min_date=datetime.now(),
                                 max_date=datetime.now(),
                                 total_count=0, save_db=True):
@@ -57,24 +57,24 @@ def update_trans_status_success(id, trans_type, wind_count=0, time_granularity=0
             exec_sql = """
             update data_transfer set transfer_status = 1,trans_sys_status = 1,transfer_progress=100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now(),
             data_min_time= %s,data_max_time= %s,transfer_data_count=%s
-            where id = %s  and transfer_type = %s
+            where id = %s  
             """
-            trans.execute(exec_sql, (wind_count, time_granularity, min_date, max_date, total_count, id, trans_type))
+            trans.execute(exec_sql, (wind_count, time_granularity, min_date, max_date, total_count, id))
         else:
             exec_sql = """
             update data_transfer set transfer_status = 1,trans_sys_status = 1,transfer_progress = 100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now()
-            where id = %s  and transfer_type = %s
+            where id = %s 
             """
-            trans.execute(exec_sql, (wind_count, time_granularity, id, trans_type))
+            trans.execute(exec_sql, (wind_count, time_granularity, id))
 
 
-def update_trans_transfer_progress(id, trans_type, transfer_progress=0, save_db=True):
-    print(id, trans_type, transfer_progress)
+def update_trans_transfer_progress(id,  transfer_progress=0, save_db=True):
+    print(id,  transfer_progress)
     if save_db:
         exec_sql = """
-        update data_transfer set transfer_progress =%s where id = %s  and transfer_type = %s
+        update data_transfer set transfer_progress =%s where id = %s 
         """
-        trans.execute(exec_sql, (int(transfer_progress), id, trans_type))
+        trans.execute(exec_sql, (int(transfer_progress), id))
 
 
 def get_now_running_count():
@@ -123,6 +123,7 @@ def get_data_by_id(id):
         return None
     return data[0]
 
+
 def create_wave_table(table_name, save_db=True):
     if save_db:
         exec_sql = f"""

+ 0 - 69
test_run_local.py

@@ -1,69 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time    : 2024/6/11
-# @Author  : 魏志亮
-import datetime
-import sys
-import traceback
-from os import *
-
-
-def get_exec_data(batch_no=None, read_type=None, run_count=1):
-    if batch_no and read_type:
-        data = get_data_by_batch_no_and_type(batch_no, read_type)
-        if data is None:
-            raise ValueError(f"未找到批次号:{batch_no},类型:{read_type}")
-
-    else:
-        data = get_batch_exec_data(run_count)
-        if data is None:
-            trans_print("当前有任务在执行")
-            sys.exit(0)
-        elif len(data.keys()) == 0:
-            trans_print("当前无任务")
-            sys.exit(0)
-
-    return data
-
-
-def run(data: dict = dict(), save_db=False, step=0, end=4):
-    exec_process = None
-    if data['transfer_type'] in ['second', 'minute']:
-        exec_process = MinSecTrans(data=data, save_db=save_db, step=step, end=end)
-
-    if data['transfer_type'] in ['fault', 'warn']:
-        exec_process = FaultWarnTrans(data=data, save_db=save_db, step=step, end=end)
-
-    if exec_process is None:
-        raise Exception("No exec process")
-    exec_process.run()
-
-
-if __name__ == '__main__':
-    from utils.conf.read_conf import yaml_conf, read_conf
-
-    env = 'prod'
-    if len(sys.argv) >= 2:
-        env = sys.argv[1]
-
-    conf_path = path.abspath(f"./conf/etl_config_{env}.yaml")
-    environ['ETL_CONF'] = conf_path
-    yaml_config = yaml_conf(conf_path)
-    environ['env'] = env
-    run_count = int(read_conf(yaml_config, "run_batch_count", 1))
-
-    from utils.log.trans_log import trans_print
-    from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type
-    from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
-    from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
-    from etl.wind_power.wave.WaveTrans import WaveTrans
-
-    begin = datetime.datetime.now()
-
-    try:
-        exec_process = WaveTrans(1, 'WOF091200030',
-                                 r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/收资数据/振动/CMSFTPServer/ZYXFDC2')
-        exec_process.run()
-    except Exception as e:
-        trans_print(traceback.format_exc())
-
-    trans_print("执行结束,总耗时:", datetime.datetime.now() - begin)