Przeglądaj źródła

添加本地执行接口

anmox 1 rok temu
rodzic
commit
88f77828ec
6 zmienionych plików z 87 dodań i 53 usunięć
  1. 43 7
      app.py
  2. 23 26
      etl/base/WindFarms.py
  3. 15 14
      schedule_service.py
  4. 4 4
      service/plt_service.py
  5. 1 1
      utils/log/trans_log.py
  6. 1 1
      utils/zip/unzip.py

+ 43 - 7
app.py

@@ -2,9 +2,40 @@
 # @Time    : 2024/6/6
 # @Author  : 魏志亮
 from apscheduler.executors.pool import ThreadPoolExecutor
-from flask import Flask
-import datetime
+from flask import Flask, request
 from flask_apscheduler import APScheduler
+from flask_restx import Api, Resource, fields
+
+from schedule_service import run_local
+
+app = Flask(__name__)
+
+api = Api(app, version='1.0', title='Transfer API', description='Transfer API')
+
+localExec = api.model('LocalExecModal', {
+    'step': fields.Integer(default=0, description='开始步骤 0:验证删除临时文件 1:复制文件到临时文件夹 2:整理文件到临时文件 3:保存到正式文件 4:保存到数据库 '),
+    'end': fields.Integer(default=3, description='结束步骤  0:验证删除临时文件 1:复制文件到临时文件夹 2:整理文件到临时文件 3:保存到正式文件 4:保存到数据库 '),
+    'batch_no': fields.String(default='批次号', description='批次号'),
+    'transfer_type': fields.String(default='minute', description='传输类型'),
+    'transfer_file_addr': fields.String(default='/test', description='传输文件地址'),
+    'field_name': fields.String(default='风场名称', description='风场名称'),
+    'field_code': fields.String(default="风场编号", description="风场编号"),
+    'save_db': fields.Boolean(default=False, description='是否保存到数据库')
+})
+
+
+@api.route('/local_exce')
+class LocalExec(Resource):
+    @api.expect(localExec)
+    def post(self):
+        try:
+            localExec = request.get_json()
+            run_local(localExec['step'], localExec['end'], localExec['batch_no'], localExec['transfer_type'],
+                      localExec['transfer_file_addr'], localExec['field_name'], localExec['field_code'],
+                      localExec['save_db'])
+        except Exception as e:
+            print(e)
+        return {'status': 200, 'message': '正在执行'}
 
 
 class Config(object):
@@ -12,7 +43,7 @@ class Config(object):
         {
             'id': 'job1',
             'func': 'schedule_service:run_schedule',
-            'args': (0, 3),
+            'args': (0, 4),
             'trigger': 'interval',
             'seconds': 60
         }
@@ -24,16 +55,21 @@ class Config(object):
     # coalesce积攒得任务跑几次,在时间允许得范围内 True:默认最后一次,False:在时间允许范围内全部提交
     # max_instances 同时允许并发的最大并发量
     # misfire_grace_time 如果重启任务在这个时间范围内,就能继续重启
-    SCHEDULER_JOB_DEFAULTS = {'coalesce': False, 'max_instances': 2, 'misfire_grace_time': 60}
+    SCHEDULER_JOB_DEFAULTS = {'coalesce': True, 'max_instances': 2, 'misfire_grace_time': 60}
     # 配置时区
     SCHEDULER_TIMEZONE = 'Asia/Shanghai'
 
 
+@app.teardown_appcontext
+def shutdown_scheduler(exception):
+    if scheduler.running:
+        scheduler.shutdown()
+
+
 if __name__ == '__main__':
-    app = Flask(__name__)
     app.config.from_object(Config())
 
     scheduler = APScheduler()
     scheduler.init_app(app)
-    scheduler.start()
-    app.run()
+    # scheduler.start()
+    app.run(debug=True, port=8088)

+ 23 - 26
etl/base/WindFarms.py

@@ -16,9 +16,8 @@ from utils.zip.unzip import unzip, unrar, get_desc_path
 
 class WindFarms(object):
 
-    def __init__(self, name, batch_no=None, field_code=None, params: TranseParam = None, wind_full_name=None,
-                 schedule_exec=True):
-        self.name = name
+    def __init__(self, batch_no=None, field_code=None, params: TranseParam = None, wind_full_name=None,
+                 save_db=True):
         self.batch_no = batch_no
         self.field_code = field_code
         self.wind_full_name = wind_full_name
@@ -28,7 +27,7 @@ class WindFarms(object):
         self.wind_col_trans = get_all_wind(self.field_code)
         self.batch_count = 50000
         self.save_path = None
-        self.schedule_exec = schedule_exec
+        self.save_db = save_db
         self.lock = multiprocessing.Manager().Lock()
         self.statistics_map = multiprocessing.Manager().dict()
 
@@ -38,7 +37,7 @@ class WindFarms(object):
 
         if read_path.find(self.wind_full_name) == -1:
             message = "读取路径与配置路径不匹配:" + self.trans_param.read_path + ",配置文件为:" + self.wind_full_name
-            update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.schedule_exec)
+            update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.save_db)
             raise ValueError(message)
 
         self.save_path = os.path.join(read_path[0:read_path.find(self.wind_full_name)], self.wind_full_name, "清理数据")
@@ -300,7 +299,7 @@ class WindFarms(object):
         except Exception as e:
             logger.exception(e)
             message = "读取文件列表错误:" + self.trans_param.read_path + ",系统返回错误:" + str(e)
-            update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.schedule_exec)
+            update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.save_db)
             raise e
         return all_files
 
@@ -338,7 +337,7 @@ class WindFarms(object):
             except Exception as e:
                 logger.exception(e)
                 message = "合并列出现错误:" + str(e)
-                update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.schedule_exec)
+                update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.save_db)
                 raise e
 
         else:
@@ -353,7 +352,7 @@ class WindFarms(object):
                     logger.exception(e)
                     message = "整理临时文件,系统返回错误:" + str(e)
                     update_trans_status_error(self.batch_no, self.trans_param.read_type, message,
-                                              self.schedule_exec)
+                                              self.save_db)
                     raise e
 
     def mutiprocessing_to_save_file(self):
@@ -367,7 +366,7 @@ class WindFarms(object):
         except Exception as e:
             logger.exception(e)
             message = "保存文件错误,系统返回错误:" + str(e)
-            update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.schedule_exec)
+            update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.save_db)
             raise e
 
         trans_print("结束保存到excel文件")
@@ -387,7 +386,7 @@ class WindFarms(object):
         except Exception as e:
             logger.exception(e)
             message = "保存到数据库错误,系统返回错误:" + str(e)
-            update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.schedule_exec)
+            update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.save_db)
             raise e
         trans_print("结束保存到数据库文件")
 
@@ -408,10 +407,7 @@ class WindFarms(object):
 
     def delete_tmp_files(self):
         trans_print("开始删除临时文件夹")
-        if os.path.exists(self.get_excel_tmp_path()):
-            shutil.rmtree(self.get_excel_tmp_path())
-        if os.path.exists(self.get_read_tmp_path()):
-            shutil.rmtree(self.get_read_tmp_path())
+
         if os.path.exists(self.get_save_tmp_path()):
             shutil.rmtree(self.get_save_tmp_path())
 
@@ -424,9 +420,9 @@ class WindFarms(object):
 
     def run(self, step=0, end=3):
         begin = datetime.datetime.now()
-        trans_print("开始执行", self.name, self.trans_param.read_type)
+        trans_print("开始执行")
 
-        update_trans_status_running(self.batch_no, self.trans_param.read_type, self.schedule_exec)
+        update_trans_status_running(self.batch_no, self.trans_param.read_type, self.save_db)
 
         if step <= 0 and end >= 0:
             tmp_begin = datetime.datetime.now()
@@ -434,7 +430,7 @@ class WindFarms(object):
             self.delete_batch_files()
             self.delete_batch_db()
 
-            self.params_valid([self.name, self.batch_no, self.field_code, self.save_path, self.trans_param.read_type,
+            self.params_valid([self.batch_no, self.field_code, self.save_path, self.trans_param.read_type,
                                self.trans_param.read_path, self.wind_full_name])
 
             if self.trans_param.resolve_col_prefix:
@@ -477,17 +473,18 @@ class WindFarms(object):
                         str(datetime.datetime.now() - begin))
 
         if step <= 4 and end >= 4:
-            tmp_begin = datetime.datetime.now()
-            trans_print("开始保存到数据库")
-            self.mutiprocessing_to_save_db()
-            trans_print("保存到数据库结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
-                        str(datetime.datetime.now() - begin))
+            if self.save_db:
+                trans_print("开始保存到数据库")
+                tmp_begin = datetime.datetime.now()
+                self.mutiprocessing_to_save_db()
+                trans_print("保存到数据库结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
+                            str(datetime.datetime.now() - begin))
         # 如果end==0 则说明只是进行了验证
         if end != 0:
             update_trans_status_success(self.batch_no, self.trans_param.read_type,
-                                        len(read_excel_files(self.get_read_tmp_path())), self.schedule_exec)
-
-        trans_print("开始执行", self.name, self.trans_param.read_type, ",,总耗时:",
-                    str(datetime.datetime.now() - begin))
+                                        len(read_excel_files(self.get_read_tmp_path())), self.save_db)
 
         self.delete_tmp_files()
+
+        trans_print("结束执行", self.trans_param.read_type, ",总耗时:",
+                    str(datetime.datetime.now() - begin))

+ 15 - 14
schedule_service.py

@@ -22,11 +22,11 @@ def run_schedule(step=0, end=3):
         field_code = data['field_code']
         field_name = data['field_name']
 
-        __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr, field_name, field_code, schedule_exec=True)
+        __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr, field_name, field_code, save_db=True)
 
 
 def run_local(step=0, end=3, batch_no=None, transfer_type=None, transfer_file_addr=None, field_name=None,
-              field_code="测试"):
+              field_code="测试", save_db=False):
     if batch_no is None or str(batch_no).strip() == '':
         return "批次编号不能为空"
 
@@ -36,17 +36,18 @@ def run_local(step=0, end=3, batch_no=None, transfer_type=None, transfer_file_ad
     if transfer_file_addr is None or str(transfer_file_addr).strip() == '':
         return "文件路径不能为空"
 
-    __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr, field_name, field_code, schedule_exec=False)
+    __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr, field_name, field_code,
+                 save_db=save_db)
 
 
 def __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr=None, field_name=None, field_code="测试",
-                 schedule_exec=False):
+                 save_db=False):
     init_log(batch_no, field_name, transfer_type)
     conf_map = get_trans_conf(field_name, transfer_type)
     if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
         message = f"未找到{field_name}的{transfer_type}配置"
         trans_print(message)
-        update_trans_status_error(batch_no, transfer_type, message, schedule_exec)
+        update_trans_status_error(batch_no, transfer_type, message, save_db)
     else:
 
         resolve_col_prefix = read_conf(conf_map, 'resolve_col_prefix')
@@ -84,8 +85,8 @@ def __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr=None, fi
         for col in trans_cols:
             cols_trans_all[col] = read_conf(conf_map, col, '')
 
-        trans_subject = WindFarms(field_name, batch_no=batch_no, field_code=field_code,
-                                  wind_full_name=wind_full_name, schedule_exec=schedule_exec)
+        trans_subject = WindFarms(batch_no=batch_no, field_code=field_code,
+                                  wind_full_name=wind_full_name, save_db=save_db)
 
         params = TranseParam(read_type=transfer_type, read_path=transfer_file_addr,
                              cols_tran=cols_trans_all,
@@ -100,15 +101,15 @@ def __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr=None, fi
         except Exception as e:
             logger.exception(e)
             message = "系统返回错误:" + str(e)
-            update_trans_status_error(batch_no, transfer_type, message, schedule_exec)
+            update_trans_status_error(batch_no, transfer_type, message, save_db)
 
 
 if __name__ == '__main__':
-    step = 0
-    end = 3
-    batch_no = 'hongtiguan-test'
+    step = 4
+    end = 4
+    batch_no = 'WOF063100040-WOB00008'
     transfer_type = 'second'
-    transfer_file_addr = r'/data/download/collection_data/1进行中/虹梯官风电场-山西-大唐/收资数据/秒级数据/20240527秒级数据'
-    field_name = '虹梯官风电场'
+    transfer_file_addr = r'/data/download/collection_data/2完成/招远风电场-山东-大唐/收资数据/招远秒级数据'
+    field_name = '招远风电场'
     field_code = "测试"
-    run_local(step, end, batch_no, transfer_type, transfer_file_addr, field_name, field_code)
+    run_local(step, end, batch_no, transfer_type, transfer_file_addr, field_name, field_code, save_db=True)

+ 4 - 4
service/plt_service.py

@@ -15,8 +15,8 @@ def update_trans_status_running(batch_no, trans_type, schedule_exec=True):
         plt.execute(exec_sql, (batch_no, trans_type))
 
 
-def update_trans_status_error(batch_no, trans_type, message="", schedule_exec=True):
-    if schedule_exec:
+def update_trans_status_error(batch_no, trans_type, message="", save_db=True):
+    if save_db:
         exec_sql = """
         update data_transfer set transfer_state = 2,trans_sys_status=2 ,err_info= %s,transfer_finish_time=now() 
         where batch_code = %s  and  transfer_type = %s
@@ -24,8 +24,8 @@ def update_trans_status_error(batch_no, trans_type, message="", schedule_exec=Tr
         plt.execute(exec_sql, (message, batch_no, trans_type))
 
 
-def update_trans_status_success(batch_no, trans_type, wind_count=0, schedule_exec=True):
-    if schedule_exec:
+def update_trans_status_success(batch_no, trans_type, wind_count=0, save_db=True):
+    if save_db:
         exec_sql = """
         update data_transfer set transfer_state = 1,trans_sys_status = 1,err_info = '',engine_count =%s,transfer_finish_time=now()  
         where batch_code = %s  and transfer_type = %s

+ 1 - 1
utils/log/trans_log.py

@@ -16,7 +16,7 @@ logger.addHandler(stout_handle)
 
 
 def init_log(batch_no, name, type):
-    log_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + '/logs'
+    log_path = r'/data/logs/trans_data'
     file_path = os.path.join(log_path, str(name), str(batch_no), str(type))
 
     if not os.path.exists(file_path):

+ 1 - 1
utils/zip/unzip.py

@@ -28,7 +28,7 @@ def unzip(zip_filepath, dest_path):
     trans_print("解压到:", dest_path)
 
     try:
-        with __support_gbk(zipfile.ZipFile(zip_filepath, 'r')) as zip_ref:
+        with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
             zip_ref.extractall(dest_path)
     except zipfile.BadZipFile as e:
         logger.exception(e)