Ver código fonte

切换到class程序

wzl 10 meses atrás
pai
commit
8bd977a444

+ 0 - 81
app.py

@@ -1,81 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time    : 2024/6/6
-# @Author  : 魏志亮
-from apscheduler.executors.pool import ThreadPoolExecutor
-from flask import Flask, request
-from flask_apscheduler import APScheduler
-from flask_restx import Api, Resource, fields
-
-from schedule_service import run_local
-
-from flask_executor import Executor
-
-app = Flask(__name__)
-
-executor = Executor(app)
-api = Api(app, version='1.0', title='Transfer API', description='Transfer API')
-
-localExec = api.model('LocalExecModal', {
-    'step': fields.Integer(default=0, description='开始步骤 0:验证删除临时文件 1:复制文件到临时文件夹 2:整理文件到临时文件 3:保存到正式文件 4:保存到数据库 '),
-    'end': fields.Integer(default=3, description='结束步骤  0:验证删除临时文件 1:复制文件到临时文件夹 2:整理文件到临时文件 3:保存到正式文件 4:保存到数据库 '),
-    'batch_no': fields.String(default='批次号', description='批次号'),
-    'transfer_type': fields.String(default='minute', description='传输类型'),
-    'transfer_file_addr': fields.String(default='/test', description='传输文件地址'),
-    'field_name': fields.String(default='风场名称', description='风场名称'),
-    'field_code': fields.String(default="风场编号", description="风场编号"),
-    'save_db': fields.Boolean(default=False, description='是否保存到数据库')
-})
-
-
-@api.route('/local_exce')
-class LocalExec(Resource):
-    @api.expect(localExec)
-    def post(self):
-        def local_exec():
-            try:
-                localExec = request.get_json()
-                run_local(localExec['step'], localExec['end'], localExec['batch_no'], localExec['transfer_type'],
-                          localExec['transfer_file_addr'], localExec['field_name'], localExec['field_code'],
-                          localExec['save_db'])
-            except Exception as e:
-                print(e)
-
-        executor.submit(local_exec)
-        return {'status': 200, 'message': '正在执行'}
-
-
-class Config(object):
-    JOBS = [
-        {
-            'id': 'job1',
-            'func': 'schedule_service:run_schedule',
-            'args': (0, 4),
-            'trigger': 'interval',
-            'seconds': 60
-        }
-    ]
-    SCHEDULER_EXECUTORS = {'default': ThreadPoolExecutor(6)}
-    # 调度器开关开启
-    SCHEDULER_API_ENABLED = True
-    # 设置容错时间为 2min
-    # coalesce积攒得任务跑几次,在时间允许得范围内 True:默认最后一次,False:在时间允许范围内全部提交
-    # max_instances 同时允许并发的最大并发量
-    # misfire_grace_time 如果重启任务在这个时间范围内,就能继续重启
-    SCHEDULER_JOB_DEFAULTS = {'coalesce': True, 'max_instances': 2, 'misfire_grace_time': 60}
-    # 配置时区
-    SCHEDULER_TIMEZONE = 'Asia/Shanghai'
-
-
-@app.teardown_appcontext
-def shutdown_scheduler(exception):
-    if scheduler.running:
-        scheduler.shutdown()
-
-
-if __name__ == '__main__':
-    app.config.from_object(Config())
-
-    scheduler = APScheduler()
-    scheduler.init_app(app)
-    scheduler.start()
-    app.run(host='0.0.0.0', port=8088)

+ 29 - 22
schedule_service.py → app_run.py

@@ -6,8 +6,11 @@ import sys
 import traceback
 
 
-def run_schedule(step=0, end=4):
-    data = get_exec_data()
+def run_schedule(step=0, end=4, run_count=1):
+    # 更新超时任务
+    update_timeout_trans_data()
+
+    data = get_exec_data(run_count)
     if data is None:
         trans_print("当前有任务在执行")
     elif len(data.keys()) == 0:
@@ -44,7 +47,7 @@ def __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_a
                  save_db=False):
     trance_id = '-'.join([batch_no, field_name, transfer_type])
     set_trance_id(trance_id)
-    conf_map = get_trans_conf(field_name, transfer_type)
+    conf_map = get_trans_conf(field_code, field_name, transfer_type)
     if conf_map is None or type(conf_map) == tuple or len(conf_map.keys()) == 0:
         message = f"未找到{field_name}的{transfer_type}配置"
         trans_print(message)
@@ -53,7 +56,6 @@ def __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_a
 
         resolve_col_prefix = read_conf(conf_map, 'resolve_col_prefix')
         wind_name_exec = read_conf(conf_map, 'wind_name_exec', None)
-        wind_full_name = read_conf(conf_map, 'wind_full_name')
         is_vertical_table = read_conf(conf_map, 'is_vertical_table', False)
         merge_columns = read_conf(conf_map, 'merge_columns', False)
 
@@ -88,18 +90,18 @@ def __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_a
         for col in trans_cols:
             cols_trans_all[col] = read_conf(conf_map, col, '')
 
-        trans_subject = WindFarms(batch_no=batch_no, batch_name=batch_name, field_code=field_code,
-                                  wind_full_name=wind_full_name, save_db=save_db, header=begin_header)
-
-        params = TranseParam(read_type=transfer_type, read_path=transfer_file_addr,
-                             cols_tran=cols_trans_all,
-                             wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
-                             vertical_cols=vertical_cols, vertical_key=vertical_key,
-                             vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
-                             resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols)
+        params = TransParam(read_type=transfer_type, read_path=transfer_file_addr,
+                            cols_tran=cols_trans_all,
+                            wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
+                            vertical_cols=vertical_cols, vertical_key=vertical_key,
+                            vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
+                            resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols)
 
-        trans_subject.set_trans_param(params)
         try:
+            trans_subject = WindFarms(batch_no=batch_no, batch_name=batch_name, field_code=field_code,
+                                      field_name=field_name,
+                                      save_db=save_db,
+                                      header=begin_header, trans_param=params)
             trans_subject.run(step=step, end=end)
         except Exception as e:
             trans_print(traceback.format_exc())
@@ -107,29 +109,34 @@ def __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_a
             update_trans_status_error(batch_no, transfer_type, message, save_db)
         finally:
             set_trance_id("")
-            trans_subject.delete_tmp_files()
+            # trans_subject.pathsAndTable.delete_tmp_files()
 
 
 if __name__ == '__main__':
     env = None
     if len(sys.argv) >= 2:
         env = sys.argv[1]
-
+    else:
+        env = 'dev'
     print(sys.argv)
     if env is None:
         raise Exception("请配置运行环境")
 
     os.environ['env'] = env
 
+    run_count = 1
+    if len(sys.argv) >= 3:
+        run_count = int(sys.argv[2])
+
     from utils.log.trans_log import trans_print, set_trance_id
-    from etl.base.TranseParam import TranseParam
+    from etl.base.TransParam import TransParam
     from etl.base.WindFarms import WindFarms
-    from service.plt_service import get_exec_data, update_trans_status_error
+    from service.plt_service import get_exec_data, update_trans_status_error, update_timeout_trans_data
     from service.trans_service import get_trans_conf
     from utils.conf.read_conf import read_conf
 
-    run_schedule()
+    run_schedule(run_count=run_count)
 
-    # run_local(4, 4, batch_no='WOF053600062-WOB00022', batch_name='test-6-24', transfer_type='second',
-    #           transfer_file_addr=r'/data/download/collection_data/2完成/招远风电场-山东-大唐/收资数据/招远秒级数据', field_name='招远风电场',
-    #           field_code="测试", save_db=True)
+    # run_local(4, 4, batch_no='WOF035200003-WOB000004', batch_name='MMFDC_second0718', transfer_type='second',
+    #            transfer_file_addr=r'/data/download/collection_data/1进行中/密马风电场-山西-大唐/收资数据/scada/秒级数据', field_name='密马风电场',
+    #            field_code="WOF093400005", save_db=True)

+ 5 - 0
conf/db.py

@@ -45,3 +45,8 @@ mysql_config = \
           'mincached': 1,
           'setsession': []}
      }
+
+
+if __name__ == '__main__':
+    import yaml
+    print(yaml.dump(mysql_config, indent=4))

+ 45 - 0
conf/etl_config.yaml

@@ -0,0 +1,45 @@
+plt_connect_pool_config:
+  blocking: true
+  charset: utf8mb4
+  maxcached: 5
+  maxconnections: 20
+  maxshared: 0
+  mincached: 2
+  setsession: [ ]
+plt_dev:
+  database: energy
+  host: 192.168.50.233
+  password: admin123456
+  port: 3306
+  user: admin
+plt_prod:
+  database: energy_prod
+  host: 192.168.50.233
+  password: admin123456
+  port: 3306
+  user: admin
+trans_connect_pool_config:
+  blocking: true
+  charset: utf8
+  maxcached: 20
+  maxconnections: 20
+  maxshared: 0
+  mincached: 1
+  setsession: [ ]
+trans_dev:
+  database: energy_data
+  host: 192.168.50.235
+  password: admin123456
+  port: 30306
+  user: root
+trans_prod:
+  database: energy_data_prod
+  host: 192.168.50.235
+  password: admin123456
+  port: 30306
+  user: root
+
+# 如果要放在原始路径,则配置这个 以下面的名称作为切割点,新建清理数据文件夹
+etl_origin_path_contain: 收资数据
+# 如果单独保存,配置这个路径
+save_path:

+ 0 - 3
etl/__init__.py

@@ -1,3 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time    : 2024/6/11
-# @Author  : 魏志亮

+ 77 - 0
etl/base/PathsAndTable.py

@@ -0,0 +1,77 @@
+import os
+import shutil
+import tempfile
+
+import yaml
+
+from service.trans_service import drop_table, creat_table_and_add_partition
+from utils.log.trans_log import trans_print
+from utils.conf.read_conf import *
+
+
+class PathsAndTable(object):
+    def __init__(self, batch_no=None, batch_name=None, read_path=None, field_name=None, read_type=None,
+                 save_db=True, save_zip=True):
+        self.batch_no = batch_no
+        self.batch_name = batch_name
+        self.read_path = read_path
+        self.field_name = field_name
+        self.read_type = read_type
+        self.save_db = save_db
+        self.save_zip = save_zip
+        self.multi_pool_count = 6
+
+        yaml_config = yaml_conf(r"/data/config/etl_config.yaml")
+
+        save_path_conf = read_conf(yaml_config, "save_path")
+        if save_path_conf:
+            self.save_path = save_path_conf + os.sep + self.field_name
+        else:
+            find_index = read_path.find(read_conf(yaml_config, 'etl_origin_path_contain', "etl_origin_path_contain"))
+            if find_index == -1:
+                raise Exception("路径未包含原始数据特定字符:" + read_path)
+            self.save_path = read_path[0:find_index] + os.sep + "清理数据"
+
+        if self.save_path is None:
+            raise Exception("未配置保存路径:" + read_path)
+
+    def get_save_path(self):
+        return os.path.join(self.save_path, self.batch_no + "_" + self.batch_name, self.read_type)
+
+    def get_save_tmp_path(self):
+        return os.path.join(tempfile.gettempdir(), self.field_name, self.batch_no + "_" + self.batch_name,
+                            self.read_type)
+
+    def get_excel_tmp_path(self):
+        return os.path.join(self.get_save_tmp_path(), 'excel_tmp' + os.sep)
+
+    def get_read_tmp_path(self):
+        return os.path.join(self.get_save_tmp_path(), 'read_tmp')
+
+    def get_table_name(self):
+        return "_".join([self.batch_no, self.read_type])
+
+    def delete_batch_files(self):
+        trans_print("开始删除已存在的批次文件夹")
+        if os.path.exists(self.get_save_path()):
+            shutil.rmtree(self.get_save_path())
+        trans_print("删除已存在的批次文件夹")
+
+    def delete_tmp_files(self):
+        trans_print("开始删除临时文件夹")
+        if os.path.exists(self.get_save_tmp_path()):
+            shutil.rmtree(self.get_save_tmp_path())
+        trans_print("删除临时文件夹删除成功")
+
+    def delete_batch_db(self):
+        if self.save_db:
+            trans_print("开始删除表")
+            table_name = self.get_table_name()
+            drop_table(table_name, self.save_db)
+            trans_print("删除表结束")
+
+    def create_batch_db(self, count=1):
+        if self.save_db:
+            trans_print("开始创建表")
+            creat_table_and_add_partition(self.get_table_name(), count, self.read_type)
+            trans_print("建表结束")

+ 5 - 3
etl/base/TranseParam.py → etl/base/TransParam.py

@@ -3,12 +3,12 @@
 # @Author  : 魏志亮
 
 
-class TranseParam(object):
+class TransParam(object):
 
-    def __init__(self, read_type=None, read_path=None, cols_tran={}, time_col=None, wind_col=None,
+    def __init__(self, read_type=None, read_path=None, cols_tran=dict(),
                  wind_name_exec=str(), is_vertical_table=False, vertical_cols=list(), vertical_key=None,
                  vertical_value=None, index_cols=list(), merge_columns=False, resolve_col_prefix=None,
-                 need_valid_cols=True):
+                 need_valid_cols=True, header=0, wind_col_trans: dict = None):
         self.read_type = read_type
         self.read_path = read_path
         self.cols_tran = cols_tran
@@ -21,3 +21,5 @@ class TranseParam(object):
         self.merge_columns = merge_columns
         self.resolve_col_prefix = resolve_col_prefix
         self.need_valid_cols = need_valid_cols
+        self.header = header
+        self.wind_col_trans = wind_col_trans

+ 58 - 475
etl/base/WindFarms.py

@@ -3,517 +3,100 @@
 # @Author  : 魏志亮
 import datetime
 import multiprocessing
-import tempfile
-import traceback
 
-from etl.base.TranseParam import TranseParam
-from service.plt_service import get_all_wind, update_trans_status_error, update_trans_status_running, \
+import pandas as pd
+
+from etl.base.PathsAndTable import PathsAndTable
+from etl.base.TransParam import TransParam
+from etl.step.ClearData import ClearData
+from etl.step.ReadAndSaveTmp import ReadAndSaveTmp
+from etl.step.SaveToDb import SaveToDb
+from etl.step.StatisticsAndSaveFile import StatisticsAndSaveFile
+from etl.step.UnzipAndRemove import UnzipAndRemove
+from service.plt_service import get_all_wind, update_trans_status_running, \
     update_trans_status_success, update_trans_transfer_progress
-from service.trans_service import creat_table_and_add_partition, save_file_to_db, drop_table
+from service.trans_service import batch_statistics
 from utils.df_utils.util import get_time_space
 from utils.file.trans_methods import *
-from utils.zip.unzip import unzip, unrar, get_desc_path
 
 
 class WindFarms(object):
 
-    def __init__(self, batch_no=None, batch_name=None, field_code=None, params: TranseParam = None, wind_full_name=None,
-                 save_db=True, header=0):
+    def __init__(self, batch_no=None, batch_name=None, field_code=None, field_name=None, params: TransParam = None,
+                 save_db=True, header=0, trans_param: TransParam = None):
         self.batch_no = batch_no
         self.batch_name = batch_name
         self.field_code = field_code
-        self.wind_full_name = wind_full_name
+        self.field_name = field_name
         self.save_zip = False
         self.trans_param = params
         self.exist_wind_names = multiprocessing.Manager().list()
         self.wind_col_trans = get_all_wind(self.field_code)
-        self.batch_count = 200000
+        self.batch_count = 50000
         self.save_path = None
         self.save_db = save_db
         self.lock = multiprocessing.Manager().Lock()
         self.statistics_map = multiprocessing.Manager().dict()
         self.header = header
+        self.trans_param = trans_param
+        self.trans_param.wind_col_trans = self.wind_col_trans
+        self.pathsAndTable = PathsAndTable(batch_no, batch_name, self.trans_param.read_path, self.field_name,
+                                           self.trans_param.read_type, save_db, save_zip=self.save_zip)
 
-    def set_trans_param(self, params: TranseParam):
-        self.trans_param = params
-        read_path = str(params.read_path)
-
-        if read_path.find(self.wind_full_name) == -1:
-            message = "读取路径与配置路径不匹配:" + self.trans_param.read_path + ",配置文件为:" + self.wind_full_name
-            update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.save_db)
-            raise ValueError(message)
-
-        self.save_path = os.path.join(read_path[0:read_path.find(self.wind_full_name)], self.wind_full_name, "清理数据")
-
-    def params_valid(self, not_null_list=list()):
-        for arg in not_null_list:
-            if arg is None or arg == '':
-                raise Exception("Invalid param set :" + arg)
-
-    def get_save_path(self):
-        return os.path.join(self.save_path, self.batch_no + "_" + self.batch_name, self.trans_param.read_type)
-
-    def get_save_tmp_path(self):
-        return os.path.join(tempfile.gettempdir(), self.wind_full_name, self.batch_no + "_" + self.batch_name,
-                            self.trans_param.read_type)
-
-    def get_excel_tmp_path(self):
-        return os.path.join(self.get_save_tmp_path(), 'excel_tmp' + os.sep)
-
-    def get_read_tmp_path(self):
-        return os.path.join(self.get_save_tmp_path(), 'read_tmp')
-
-    def df_save_to_tmp_file(self, df=pd.DataFrame(), file=None):
-
-        if self.trans_param.is_vertical_table:
-            pass
-        else:
-            # 转换字段
-            if self.trans_param.cols_tran:
-                cols_tran = self.trans_param.cols_tran
-                real_cols_trans = dict()
-                for k, v in cols_tran.items():
-                    if v and not v.startswith("$"):
-                        real_cols_trans[v] = k
-
-                trans_print("包含转换字段,开始处理转换字段")
-                df.rename(columns=real_cols_trans, inplace=True)
-
-                del_keys = set(df.columns) - set(cols_tran.keys())
-
-                for key in del_keys:
-                    df.drop(key, axis=1, inplace=True)
-
-        df = del_blank(df, ['wind_turbine_number'])
-        df = df[df['time_stamp'].isna() == False]
-        if self.trans_param.wind_name_exec:
-            exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
-            df['wind_turbine_number'] = eval(exec_str)
-
-        self.save_to_tmp_csv(df, file)
-
-    def get_and_remove(self, file, thead_local=None):
-
-        to_path = self.get_excel_tmp_path()
-        if str(file).endswith("zip"):
-            if str(file).endswith("csv.zip"):
-                copy_to_new(file, file.replace(self.trans_param.read_path, to_path).replace("csv.zip", 'csv.gz'))
-            else:
-                desc_path = file.replace(self.trans_param.read_path, to_path)
-                is_success, e = unzip(file, get_desc_path(desc_path))
-                self.trans_param.has_zip = True
-                if not is_success:
-                    # raise e
-                    pass
-        elif str(file).endswith("rar"):
-            desc_path = file.replace(self.trans_param.read_path, to_path)
-            is_success, e = unrar(file, get_desc_path(desc_path))
-            self.trans_param.has_zip = True
-            if not is_success:
-                # raise e
-                pass
-        else:
-            copy_to_new(file, file.replace(self.trans_param.read_path, to_path))
-
-    def read_excel_to_df(self, file_path):
-
-        read_cols = [v.split(",")[0] for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")]
-
-        trans_dict = {}
-        for k, v in self.trans_param.cols_tran.items():
-            if v and not str(v).startswith("$"):
-                trans_dict[v] = k
-
-        if self.trans_param.is_vertical_table:
-            vertical_cols = self.trans_param.vertical_cols
-            df = read_file_to_df(file_path, vertical_cols, header=self.header)
-            df = df[df[self.trans_param.vertical_key].isin(read_cols)]
-            df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number',
-                               self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True)
-            df[self.trans_param.vertical_key] = df[self.trans_param.vertical_key].map(trans_dict).fillna(
-                df[self.trans_param.vertical_key])
-
-            return df
-
-        else:
-            trans_dict = dict()
-            for k, v in self.trans_param.cols_tran.items():
-                if v and v.startswith("$") or v.find(",") > 0:
-                    trans_dict[v] = k
-
-            if self.trans_param.merge_columns:
-                df = read_file_to_df(file_path, header=self.header)
-            else:
-                if self.trans_param.need_valid_cols:
-                    df = read_file_to_df(file_path, read_cols, header=self.header)
-                else:
-                    df = read_file_to_df(file_path, header=self.header)
-
-            # 处理列名前缀问题
-            if self.trans_param.resolve_col_prefix:
-                columns_dict = dict()
-                for column in df.columns:
-                    columns_dict[column] = eval(self.trans_param.resolve_col_prefix)
-                df.rename(columns=columns_dict, inplace=True)
-
-            for k, v in trans_dict.items():
-                if k.startswith("$file"):
-                    file = ".".join(os.path.basename(file_path).split(".")[0:-1])
-                    if k == "$file":
-                        df[v] = str(file)
-                    elif k.startswith("$file["):
-                        datas = str(k.replace("$file", "").replace("[", "").replace("]", "")).split(":")
-                        if len(datas) != 2:
-                            raise Exception("字段映射出现错误 :" + str(trans_dict))
-                        df[v] = str(file[int(datas[0]):int(datas[1])]).strip()
-                elif k.find("$file_date") > 0:
-                    datas = str(k.split(",")[1].replace("$file_date", "").replace("[", "").replace("]", "")).split(":")
-                    if len(datas) != 2:
-                        raise Exception("字段映射出现错误 :" + str(trans_dict))
-                    date_str = str(file[int(datas[0]):int(datas[1])]).strip()
-                    df[v] = df[k.split(",")[0]].apply(lambda x: date_str + " " + str(x))
-
-                elif k.startswith("$folder"):
-                    folder = file_path
-                    cengshu = int(str(k.replace("$folder", "").replace("[", "").replace("]", "")))
-                    for i in range(cengshu):
-                        folder = os.path.dirname(folder)
-                    df[v] = str(str(folder).split(os.sep)[-1]).strip()
-
-            return df
-
-    def _save_to_tmp_csv_by_name(self, df, name):
-        save_name = str(name) + '.csv'
-        save_path = os.path.join(self.get_read_tmp_path(), save_name)
-        create_file_path(save_path, is_file_path=True)
-
-        with self.lock:
-            if name in self.exist_wind_names:
-                contains_name = True
-            else:
-                contains_name = False
-                self.exist_wind_names.append(name)
-
-        if contains_name:
-            df.to_csv(save_path, index=False, encoding='utf8', mode='a',
-                      header=False)
-        else:
-            df.to_csv(save_path, index=False, encoding='utf8')
-
-    def save_to_tmp_csv(self, df, file):
-        trans_print("开始保存", str(file), "到临时文件")
-        names = set(df['wind_turbine_number'].values)
-
-        with multiprocessing.Pool(6) as pool:
-            pool.starmap(self._save_to_tmp_csv_by_name,
-                         [(df[df['wind_turbine_number'] == name], name) for name in names])
-        del df
-        trans_print("保存", str(names), "到临时文件成功, 风机数量", len(names))
-
-    def set_statistics_data(self, df):
-
-        if not df.empty:
-            df['time_stamp'] = pd.to_datetime(df['time_stamp'])
-            min_date = df['time_stamp'].min()
-            max_date = df['time_stamp'].max()
-            with self.lock:
-
-                if 'min_date' in self.statistics_map.keys():
-                    if self.statistics_map['min_date'] > min_date:
-                        self.statistics_map['min_date'] = min_date
-                else:
-                    self.statistics_map['min_date'] = min_date
-
-                if 'max_date' in self.statistics_map.keys():
-                    if self.statistics_map['max_date'] < max_date:
-                        self.statistics_map['max_date'] = max_date
-                else:
-                    self.statistics_map['max_date'] = max_date
-
-                if 'total_count' in self.statistics_map.keys():
-                    self.statistics_map['total_count'] = self.statistics_map['total_count'] + df.shape[0]
-                else:
-                    self.statistics_map['total_count'] = df.shape[0]
-
-                if 'time_granularity' not in self.statistics_map.keys():
-                    self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp')
-
-    def save_statistics_file(self):
-        save_path = os.path.join(os.path.dirname(self.get_save_path()),
-                                 self.trans_param.read_type + '_statistics.txt')
-        create_file_path(save_path, is_file_path=True)
-        with open(save_path, 'w', encoding='utf8') as f:
-            f.write("总数据量:" + str(self.statistics_map['total_count']) + "\n")
-            f.write("最小时间:" + str(self.statistics_map['min_date']) + "\n")
-            f.write("最大时间:" + str(self.statistics_map['max_date']) + "\n")
-            f.write("风机数量:" + str(len(read_excel_files(self.get_read_tmp_path()))) + "\n")
-
-    def save_to_csv(self, filename):
-        df = read_file_to_df(filename)
-
-        if self.trans_param.is_vertical_table:
-            df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
-                                values=self.trans_param.vertical_value,
-                                aggfunc='max')
-            # 重置索引以得到普通的列
-            df.reset_index(inplace=True)
-
-        for k in self.trans_param.cols_tran.keys():
-            if k not in df.columns:
-                df[k] = None
-
-        df = df[self.trans_param.cols_tran.keys()]
-
-        # 转化风机名称
-        trans_print("开始转化风机名称")
-        # if self.trans_param.wind_name_exec:
-        #     exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
-        # df['wind_turbine_number'] = eval(exec_str)
-        df['wind_turbine_number'] = df['wind_turbine_number'].astype('str')
-        df['wind_turbine_name'] = df['wind_turbine_number']
-        df['wind_turbine_number'] = df['wind_turbine_number'].map(
-            self.wind_col_trans).fillna(
-            df['wind_turbine_number'])
-
-        wind_col_name = str(df['wind_turbine_number'].values[0])
-        # 添加年月日
-        trans_print(wind_col_name, "包含时间字段,开始处理时间字段,添加年月日", filename)
-        trans_print(wind_col_name, "时间原始大小:", df.shape[0])
-        df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
-        trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
-        df['time_stamp'] = pd.to_datetime(df['time_stamp'])
-        df['year'] = df['time_stamp'].dt.year
-        df['month'] = df['time_stamp'].dt.month
-        df['day'] = df['time_stamp'].dt.day
-        df.sort_values(by='time_stamp', inplace=True)
-        df['time_stamp'] = df['time_stamp'].apply(
-            lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
-        trans_print("处理时间字段结束")
-
-        # 如果包含*号,祛除
-        trans_print(wind_col_name, "过滤星号前大小:", df.shape[0])
-        mask = ~df.applymap(lambda x: isinstance(x, str) and '*' in x).any(axis=1)
-        df = df[mask]
-        trans_print(wind_col_name, "过滤星号后大小:", df.shape[0])
-
-        trans_print(wind_col_name, "转化风机名称结束")
-
-        if self.save_zip:
-            save_path = os.path.join(self.get_save_path(), str(wind_col_name) + '.csv.gz')
-        else:
-            save_path = os.path.join(self.get_save_path(), str(wind_col_name) + '.csv')
-        create_file_path(save_path, is_file_path=True)
-        if self.save_zip:
-            df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
-        else:
-            df.to_csv(save_path, index=False, encoding='utf-8')
-
-        self.set_statistics_data(df)
-
-        del df
-        trans_print("保存" + str(filename) + ".csv成功")
-
-    def remove_file_to_tmp_path(self):
-        # 读取文件
-        try:
-            if os.path.isfile(self.trans_param.read_path):
-                all_files = [self.trans_param.read_path]
-            else:
-                all_files = read_files(self.trans_param.read_path)
-
-            with multiprocessing.Pool(6) as pool:
-                pool.starmap(self.get_and_remove, [(i,) for i in all_files])
-
-            all_files = read_excel_files(self.get_excel_tmp_path())
-
-            trans_print('读取文件数量:', len(all_files))
-        except Exception as e:
-            trans_print(traceback.format_exc())
-            message = "读取文件列表错误:" + self.trans_param.read_path + ",系统返回错误:" + str(e)
-            raise ValueError(message)
-        return all_files
-
-    def read_file_and_save_tmp(self):
-        all_files = read_excel_files(self.get_excel_tmp_path())
-        if self.trans_param.merge_columns:
-            dfs_list = list()
-            index_keys = [self.trans_param.cols_tran['time_stamp']]
-            wind_col = self.trans_param.cols_tran['wind_turbine_number']
-            if str(wind_col).startswith("$"):
-                wind_col = 'wind_turbine_number'
-            index_keys.append(wind_col)
-            df_map = dict()
-            with multiprocessing.Pool(6) as pool:
-                dfs = pool.starmap(self.read_excel_to_df, [(file,) for file in all_files])
-
-            for df in dfs:
-                key = '-'.join(df.columns)
-                if key in df_map.keys():
-                    df_map[key] = pd.concat([df_map[key], df])
-                else:
-                    df_map[key] = df
-
-            for k, df in df_map.items():
-                df.drop_duplicates(inplace=True)
-                df.set_index(keys=index_keys, inplace=True)
-                df = df[~df.index.duplicated(keep='first')]
-                dfs_list.append(df)
-
-            df = pd.concat(dfs_list, axis=1)
-            df.reset_index(inplace=True)
-            try:
-                self.df_save_to_tmp_file(df, "")
-            except Exception as e:
-                trans_print(traceback.format_exc())
-                message = "合并列出现错误:" + str(e)
-                raise ValueError(message)
-
-        else:
-            split_count = 6
-            all_arrays = split_array(all_files, split_count)
-            for arr in all_arrays:
-                with multiprocessing.Pool(split_count) as pool:
-                    dfs = pool.starmap(self.read_excel_to_df, [(ar,) for ar in arr])
-                try:
-                    for df in dfs:
-                        self.df_save_to_tmp_file(df)
-                except Exception as e:
-                    trans_print(traceback.format_exc())
-                    message = "整理临时文件,系统返回错误:" + str(e)
-                    raise ValueError(message)
-
-    def mutiprocessing_to_save_file(self):
-        # 开始保存到正式文件
-        trans_print("开始保存到excel文件")
-        all_tmp_files = read_excel_files(self.get_read_tmp_path())
-        try:
-            with multiprocessing.Pool(6) as pool:
-                pool.starmap(self.save_to_csv, [(file,) for file in all_tmp_files])
-        except Exception as e:
-            trans_print(traceback.format_exc())
-            message = "保存文件错误,系统返回错误:" + str(e)
-            raise ValueError(message)
-
-        trans_print("结束保存到excel文件")
-
-    def mutiprocessing_to_save_db(self):
-        # 开始保存到SQL文件
-        trans_print("开始保存到数据库文件")
-        all_saved_files = read_excel_files(self.get_save_path())
-        table_name = self.batch_no + "_" + self.trans_param.read_type
-        creat_table_and_add_partition(table_name, len(all_saved_files), self.trans_param.read_type)
-        try:
-
-            with multiprocessing.Pool(6) as pool:
-                pool.starmap(save_file_to_db,
-                             [(table_name, file, self.batch_count) for file in all_saved_files])
-
-        except Exception as e:
-            trans_print(traceback.format_exc())
-            message = "保存到数据库错误,系统返回错误:" + str(e)
-            raise ValueError(message)
-        trans_print("结束保存到数据库文件")
-
-    def _rename_file(self):
-        save_path = self.get_save_path()
-        files = os.listdir(save_path)
-
-        files.sort(key=lambda x: int(str(x).split(os.sep)[-1].split(".")[0][1:]))
-        for index, file in enumerate(files):
-            file_path = os.path.join(save_path, 'F' + str(index + 1).zfill(3) + ".csv.gz")
-            os.rename(os.path.join(save_path, file), file_path)
-
-    def delete_batch_files(self):
-        trans_print("开始删除已存在的批次文件夹")
-        if os.path.exists(self.get_save_path()):
-            shutil.rmtree(self.get_save_path())
-        trans_print("删除已存在的批次文件夹")
-
-    def delete_tmp_files(self):
-        trans_print("开始删除临时文件夹")
-
-        if os.path.exists(self.get_save_tmp_path()):
-            shutil.rmtree(self.get_save_tmp_path())
-
-        trans_print("删除临时文件夹删除成功")
-
-    def delete_batch_db(self):
-        if self.save_db:
-            table_name = "_".join([self.batch_no, self.trans_param.read_type])
-            renamed_table_name = "del_" + table_name + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S')
-            # rename_table(table_name, renamed_table_name, self.save_db)
-            drop_table(table_name, self.save_db)
-
-    def run(self, step=0, end=3):
+    def run(self, step=0, end=4):
         begin = datetime.datetime.now()
         trans_print("开始执行")
-
         update_trans_status_running(self.batch_no, self.trans_param.read_type, self.save_db)
-
         if step <= 0 and end >= 0:
-            tmp_begin = datetime.datetime.now()
-            trans_print("开始初始化字段")
-            self.delete_batch_files()
-            self.delete_batch_db()
-
-            self.params_valid([self.batch_no, self.field_code, self.save_path, self.trans_param.read_type,
-                               self.trans_param.read_path, self.wind_full_name])
-
-            # if self.trans_param.resolve_col_prefix:
-            #     column = "测试"
-            #     eval(self.trans_param.resolve_col_prefix)
-            #
-            # if self.trans_param.wind_name_exec:
-            #     wind_name = "测试"
-            #     eval(self.trans_param.wind_name_exec)
-            update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 5, self.save_db)
-            trans_print("初始化字段结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
-                        str(datetime.datetime.now() - begin))
+            cleanData = ClearData(self.pathsAndTable)
+            cleanData.run()
 
         if step <= 1 and end >= 1:
             # 更新运行状态到运行中
-            tmp_begin = datetime.datetime.now()
-            self.delete_tmp_files()
-            trans_print("开始保存到临时路径")
-            # 开始读取数据并分类保存临时文件
-            self.remove_file_to_tmp_path()
-            update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 20, self.save_db)
-            trans_print("保存到临时路径结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
-                        str(datetime.datetime.now() - begin))
+            unzipAndRemove = UnzipAndRemove(self.pathsAndTable)
+            unzipAndRemove.run()
 
         if step <= 2 and end >= 2:
             # 更新运行状态到运行中
-            tmp_begin = datetime.datetime.now()
-            trans_print("开始保存到临时文件")
-
-            # 开始读取数据并分类保存临时文件
-            self.read_file_and_save_tmp()
-            update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 50, self.save_db)
-            trans_print("保存到临时文件结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
-                        str(datetime.datetime.now() - begin))
+            readAndSaveTmp = ReadAndSaveTmp(self.pathsAndTable, self.trans_param)
+            readAndSaveTmp.run()
 
         if step <= 3 and end >= 3:
-            tmp_begin = datetime.datetime.now()
-            trans_print("开始保存到文件")
-            self.mutiprocessing_to_save_file()
-            self.save_statistics_file()
-            update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 70, self.save_db)
-            trans_print("保存到文件结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
-                        str(datetime.datetime.now() - begin))
+            # 保存到正式文件
+            statisticsAndSaveFile = StatisticsAndSaveFile(self.pathsAndTable, self.trans_param, self.statistics_map)
+            statisticsAndSaveFile.run()
 
         if step <= 4 and end >= 4:
             if self.save_db:
-                trans_print("开始保存到数据库")
-                tmp_begin = datetime.datetime.now()
-                self.mutiprocessing_to_save_db()
-                trans_print("保存到数据库结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
-                            str(datetime.datetime.now() - begin))
-        update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 100, self.save_db)
-        # 如果end==0 则说明只是进行了验证
-        if end != 0:
-            update_trans_status_success(self.batch_no, self.trans_param.read_type,
-                                        len(read_excel_files(self.get_read_tmp_path())),
-                                        self.statistics_map['time_granularity'], self.save_db)
+                saveToDb = SaveToDb(self.pathsAndTable)
+                saveToDb.run()
 
+        update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 99, self.save_db)
+        # 如果end==0 则说明只是进行了验证
+        if end >= 4:
+            all_files = read_excel_files(self.pathsAndTable.get_save_path())
+            if step <= 3:
+                update_trans_status_success(self.batch_no, self.trans_param.read_type,
+                                            len(all_files),
+                                            self.statistics_map['time_granularity'],
+                                            self.statistics_map['min_date'], self.statistics_map['max_date'],
+                                            self.statistics_map['total_count'], self.save_db)
+            else:
+                df = read_file_to_df(all_files[0], read_cols=['time_stamp'])
+                df['time_stamp'] = pd.to_datetime(df['time_stamp'])
+                time_granularity = get_time_space(df, 'time_stamp')
+                batch_data = batch_statistics("_".join([self.batch_no, self.trans_param.read_type]))
+                if batch_data is not None:
+                    update_trans_status_success(self.batch_no, self.trans_param.read_type,
+                                                len(read_excel_files(self.pathsAndTable.get_save_path())),
+                                                time_granularity,
+                                                batch_data['min_date'], batch_data['max_date'],
+                                                batch_data['total_count'], self.save_db)
+                else:
+                    update_trans_status_success(self.batch_no, self.trans_param.read_type,
+                                                len(read_excel_files(self.pathsAndTable.get_save_path())),
+                                                time_granularity,
+                                                None, None,
+                                                None, self.save_db)
         trans_print("结束执行", self.trans_param.read_type, ",总耗时:",
                     str(datetime.datetime.now() - begin))

+ 0 - 3
etl/base/__init__.py

@@ -1,3 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time    : 2024/5/15
-# @Author  : 魏志亮

+ 24 - 0
etl/step/ClearData.py

@@ -0,0 +1,24 @@
+import datetime
+
+from etl.base.PathsAndTable import PathsAndTable
+from service.plt_service import update_trans_transfer_progress
+from utils.log.trans_log import trans_print
+
+
+class ClearData(object):
+
+    def __init__(self, pathsAndTable: PathsAndTable):
+        self.pathsAndTable = pathsAndTable
+
+    def clean_data(self):
+        self.pathsAndTable.delete_tmp_files()
+        self.pathsAndTable.delete_batch_db()
+        self.pathsAndTable.delete_batch_files()
+
+    def run(self):
+        trans_print("开始清理数据")
+        begin = datetime.datetime.now()
+        self.clean_data()
+        update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 5,
+                                       self.pathsAndTable.save_db)
+        trans_print("清理数据结束,耗时:", datetime.datetime.now() - begin)

+ 225 - 0
etl/step/ReadAndSaveTmp.py

@@ -0,0 +1,225 @@
+import datetime
+import multiprocessing
+import os
+import traceback
+
+import pandas as pd
+
+from etl.base import TransParam
+from etl.base.PathsAndTable import PathsAndTable
+from service.plt_service import update_trans_transfer_progress
+from utils.file.trans_methods import read_excel_files, split_array, del_blank, \
+    create_file_path, read_file_to_df
+from utils.log.trans_log import trans_print
+from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
+
+
+class ReadAndSaveTmp(object):
+
+    def __init__(self, pathsAndTable: PathsAndTable, trans_param: TransParam):
+        self.pathsAndTable = pathsAndTable
+        self.trans_param = trans_param
+        self.exist_wind_names = multiprocessing.Manager().list()
+        self.lock = multiprocessing.Manager().Lock()
+
+    def _save_to_tmp_csv_by_name(self, df, name):
+        save_name = str(name) + '.csv'
+        save_path = os.path.join(self.pathsAndTable.get_read_tmp_path(), save_name)
+        create_file_path(save_path, is_file_path=True)
+
+        with self.lock:
+            if name in self.exist_wind_names:
+                contains_name = True
+            else:
+                contains_name = False
+                self.exist_wind_names.append(name)
+
+        if contains_name:
+            df.to_csv(save_path, index=False, encoding='utf8', mode='a',
+                      header=False)
+        else:
+            df.to_csv(save_path, index=False, encoding='utf8')
+
+    def df_save_to_tmp_file(self, df=pd.DataFrame()):
+
+        if self.trans_param.is_vertical_table:
+            pass
+        else:
+            # 转换字段
+            same_col = {}
+            if self.trans_param.cols_tran:
+                cols_tran = self.trans_param.cols_tran
+                real_cols_trans = dict()
+                for k, v in cols_tran.items():
+                    if v and not v.startswith("$"):
+                        if v not in real_cols_trans.keys():
+                            real_cols_trans[v] = k
+                        else:
+                            value = real_cols_trans[v]
+                            if value in same_col.keys():
+                                same_col[value].append(k)
+                            else:
+                                same_col[value] = [k]
+
+                trans_print("包含转换字段,开始处理转换字段")
+                df.rename(columns=real_cols_trans, inplace=True)
+
+                # 添加使用同一个excel字段的值
+                for key in same_col.keys():
+                    for col in same_col[key]:
+                        df[col] = df[key]
+
+                del_keys = set(df.columns) - set(cols_tran.keys())
+
+                for key in del_keys:
+                    df.drop(key, axis=1, inplace=True)
+
+        df = del_blank(df, ['wind_turbine_number'])
+        df = df[df['time_stamp'].isna() == False]
+        if self.trans_param.wind_name_exec:
+            exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
+            df['wind_turbine_number'] = eval(exec_str)
+
+        self.save_to_tmp_csv(df)
+
+    def save_to_tmp_csv(self, df):
+        names = set(df['wind_turbine_number'].values)
+        if names:
+            trans_print("开始保存", str(names), "到临时文件")
+
+            with multiprocessing.Pool(self.pathsAndTable.multi_pool_count) as pool:
+                pool.starmap(self._save_to_tmp_csv_by_name,
+                             [(df[df['wind_turbine_number'] == name], name) for name in names])
+            del df
+            trans_print("保存", str(names), "到临时文件成功, 风机数量", len(names))
+
+    def read_file_and_save_tmp(self):
+        all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
+        if self.trans_param.merge_columns:
+            dfs_list = list()
+            index_keys = [self.trans_param.cols_tran['time_stamp']]
+            wind_col = self.trans_param.cols_tran['wind_turbine_number']
+            if str(wind_col).startswith("$"):
+                wind_col = 'wind_turbine_number'
+            index_keys.append(wind_col)
+            df_map = dict()
+            # todo 需要优化
+            with multiprocessing.Pool(self.pathsAndTable.multi_pool_count) as pool:
+                dfs = pool.starmap(self.read_excel_to_df, [(file,) for file in all_files])
+
+            for df in dfs:
+                key = '-'.join(df.columns)
+                if key in df_map.keys():
+                    df_map[key] = pd.concat([df_map[key], df])
+                else:
+                    df_map[key] = df
+
+            for k, df in df_map.items():
+                df.drop_duplicates(inplace=True)
+                df.set_index(keys=index_keys, inplace=True)
+                df = df[~df.index.duplicated(keep='first')]
+                dfs_list.append(df)
+
+            df = pd.concat(dfs_list, axis=1)
+            df.reset_index(inplace=True)
+            try:
+                self.df_save_to_tmp_file(df)
+            except Exception as e:
+                trans_print(traceback.format_exc())
+                message = "合并列出现错误:" + str(e)
+                raise ValueError(message)
+
+        else:
+            split_count = use_files_get_max_cpu_count(all_files)
+            all_arrays = split_array(all_files, split_count)
+
+            for index, arr in enumerate(all_arrays):
+                try:
+                    with multiprocessing.Pool(split_count) as pool:
+                        dfs = pool.starmap(self.read_excel_to_df, [(ar,) for ar in arr])
+                    for df in dfs:
+                        self.df_save_to_tmp_file(df)
+                except Exception as e:
+                    trans_print(traceback.format_exc())
+                    message = "整理临时文件,系统返回错误:" + str(e)
+                    raise ValueError(message)
+
+                update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
+                                               round(20 + 30 * (index + 1) / len(all_arrays), 2),
+                                               self.pathsAndTable.save_db)
+
+    def read_excel_to_df(self, file_path):
+
+        read_cols = [v.split(",")[0] for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")]
+
+        trans_dict = {}
+        for k, v in self.trans_param.cols_tran.items():
+            if v and not str(v).startswith("$"):
+                trans_dict[v] = k
+
+        if self.trans_param.is_vertical_table:
+            vertical_cols = self.trans_param.vertical_cols
+            df = read_file_to_df(file_path, vertical_cols, header=self.trans_param.header)
+            df = df[df[self.trans_param.vertical_key].isin(read_cols)]
+            df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number',
+                               self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True)
+            df[self.trans_param.vertical_key] = df[self.trans_param.vertical_key].map(trans_dict).fillna(
+                df[self.trans_param.vertical_key])
+
+            return df
+
+        else:
+            trans_dict = dict()
+            for k, v in self.trans_param.cols_tran.items():
+                if v and v.startswith("$") or v.find(",") > 0:
+                    trans_dict[v] = k
+
+            if self.trans_param.merge_columns:
+                df = read_file_to_df(file_path, header=self.trans_param.header)
+            else:
+                if self.trans_param.need_valid_cols:
+                    df = read_file_to_df(file_path, read_cols, header=self.trans_param.header)
+                else:
+                    df = read_file_to_df(file_path, header=self.trans_param.header)
+
+            # 处理列名前缀问题
+            if self.trans_param.resolve_col_prefix:
+                columns_dict = dict()
+                for column in df.columns:
+                    columns_dict[column] = eval(self.trans_param.resolve_col_prefix)
+                df.rename(columns=columns_dict, inplace=True)
+
+            for k, v in trans_dict.items():
+                if k.startswith("$file"):
+                    file = ".".join(os.path.basename(file_path).split(".")[0:-1])
+                    if k == "$file":
+                        df[v] = str(file)
+                    elif k.startswith("$file["):
+                        datas = str(k.replace("$file", "").replace("[", "").replace("]", "")).split(":")
+                        if len(datas) != 2:
+                            raise Exception("字段映射出现错误 :" + str(trans_dict))
+                        df[v] = str(file[int(datas[0]):int(datas[1])]).strip()
+                elif k.find("$file_date") > 0:
+                    datas = str(k.split(",")[1].replace("$file_date", "").replace("[", "").replace("]", "")).split(":")
+                    if len(datas) != 2:
+                        raise Exception("字段映射出现错误 :" + str(trans_dict))
+                    file = ".".join(os.path.basename(file_path).split(".")[0:-1])
+                    date_str = str(file[int(datas[0]):int(datas[1])]).strip()
+                    df[v] = df[k.split(",")[0]].apply(lambda x: date_str + " " + str(x))
+
+                elif k.startswith("$folder"):
+                    folder = file_path
+                    cengshu = int(str(k.replace("$folder", "").replace("[", "").replace("]", "")))
+                    for i in range(cengshu):
+                        folder = os.path.dirname(folder)
+                    df[v] = str(str(folder).split(os.sep)[-1]).strip()
+
+            return df
+
+    def run(self):
+        trans_print("开始保存数据到临时文件")
+        begin = datetime.datetime.now()
+        self.read_file_and_save_tmp()
+        update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 50,
+                                       self.pathsAndTable.save_db)
+        trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin)

+ 50 - 0
etl/step/SaveToDb.py

@@ -0,0 +1,50 @@
+import datetime
+import multiprocessing
+import traceback
+
+from etl.base.PathsAndTable import PathsAndTable
+from service.plt_service import update_trans_transfer_progress
+from service.trans_service import creat_table_and_add_partition, save_file_to_db
+from utils.file.trans_methods import read_excel_files, split_array
+from utils.log.trans_log import trans_print
+from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
+
+
+class SaveToDb(object):
+
+    def __init__(self, pathsAndTable: PathsAndTable):
+        self.pathsAndTable = pathsAndTable
+
+    def mutiprocessing_to_save_db(self):
+        # 开始保存到SQL文件
+
+        self.pathsAndTable.delete_batch_db()
+        trans_print("开始保存到数据库文件")
+        all_saved_files = read_excel_files(self.pathsAndTable.get_save_path())
+        creat_table_and_add_partition(self.pathsAndTable.get_table_name(), len(all_saved_files),
+                                      self.pathsAndTable.read_type)
+
+        split_count = get_available_cpu_count_with_percent(percent=1 / 2)
+        split_count = split_count if split_count <= len(all_saved_files) else len(all_saved_files)
+        all_arrays = split_array(all_saved_files, split_count)
+        try:
+            for index, arr in enumerate(all_arrays):
+                with multiprocessing.Pool(split_count) as pool:
+                    pool.starmap(save_file_to_db, [(self.pathsAndTable.get_table_name(), file,) for file in
+                                                   all_saved_files])
+                update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
+                                               round(70 + 29 * (index + 1) / len(all_arrays), 2),
+                                               self.pathsAndTable.save_db)
+        except Exception as e:
+            trans_print(traceback.format_exc())
+            message = "保存到数据库错误,系统返回错误:" + str(e)
+            raise ValueError(message)
+        trans_print("结束保存到数据库文件")
+
+    def run(self):
+        trans_print("开始保存到数据库")
+        begin = datetime.datetime.now()
+        self.mutiprocessing_to_save_db()
+        update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 99,
+                                       self.pathsAndTable.save_db)
+        trans_print("保存到数据结束,耗时:", datetime.datetime.now() - begin)

+ 163 - 0
etl/step/StatisticsAndSaveFile.py

@@ -0,0 +1,163 @@
+import datetime
+import multiprocessing
+import os
+import traceback
+
+import pandas as pd
+import numpy as np
+
+from etl.base import TransParam
+from etl.base.PathsAndTable import PathsAndTable
+from service.plt_service import update_trans_transfer_progress
+from utils.df_utils.util import get_time_space
+from utils.file.trans_methods import create_file_path, read_excel_files, read_file_to_df, split_array
+from utils.log.trans_log import trans_print
+from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
+
+
+class StatisticsAndSaveFile(object):
+
+    def __init__(self, pathsAndTable: PathsAndTable, trans_param: TransParam, statistics_map):
+        self.pathsAndTable = pathsAndTable
+        self.trans_param = trans_param
+        self.statistics_map = statistics_map
+        self.lock = multiprocessing.Manager().Lock()
+
+    def set_statistics_data(self, df):
+
+        if not df.empty:
+            df['time_stamp'] = pd.to_datetime(df['time_stamp'])
+            min_date = df['time_stamp'].min()
+            max_date = df['time_stamp'].max()
+            with self.lock:
+
+                if 'min_date' in self.statistics_map.keys():
+                    if self.statistics_map['min_date'] > min_date:
+                        self.statistics_map['min_date'] = min_date
+                else:
+                    self.statistics_map['min_date'] = min_date
+
+                if 'max_date' in self.statistics_map.keys():
+                    if self.statistics_map['max_date'] < max_date:
+                        self.statistics_map['max_date'] = max_date
+                else:
+                    self.statistics_map['max_date'] = max_date
+
+                if 'total_count' in self.statistics_map.keys():
+                    self.statistics_map['total_count'] = self.statistics_map['total_count'] + df.shape[0]
+                else:
+                    self.statistics_map['total_count'] = df.shape[0]
+
+                if 'time_granularity' not in self.statistics_map.keys():
+                    self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp')
+
+    def save_statistics_file(self):
+        save_path = os.path.join(os.path.dirname(self.pathsAndTable.get_save_path()),
+                                 self.pathsAndTable.read_type + '_statistics.txt')
+        create_file_path(save_path, is_file_path=True)
+        with open(save_path, 'w', encoding='utf8') as f:
+            f.write("总数据量:" + str(self.statistics_map['total_count']) + "\n")
+            f.write("最小时间:" + str(self.statistics_map['min_date']) + "\n")
+            f.write("最大时间:" + str(self.statistics_map['max_date']) + "\n")
+            f.write("风机数量:" + str(len(read_excel_files(self.pathsAndTable.get_read_tmp_path()))) + "\n")
+
+    def check_data_validity(self, df):
+        pass
+
+    def save_to_csv(self, filename):
+        df = read_file_to_df(filename)
+
+        if self.trans_param.is_vertical_table:
+            df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
+                                values=self.trans_param.vertical_value,
+                                aggfunc='max')
+            # 重置索引以得到普通的列
+            df.reset_index(inplace=True)
+
+        for k in self.trans_param.cols_tran.keys():
+            if k not in df.columns:
+                df[k] = None
+
+        df = df[self.trans_param.cols_tran.keys()]
+
+        # 转化风机名称
+        trans_print("开始转化风机名称")
+        df['wind_turbine_number'] = df['wind_turbine_number'].astype('str')
+        df['wind_turbine_name'] = df['wind_turbine_number']
+        df['wind_turbine_number'] = df['wind_turbine_number'].map(
+            self.trans_param.wind_col_trans).fillna(df['wind_turbine_number'])
+
+        wind_col_name = str(df['wind_turbine_number'].values[0])
+        # 添加年月日
+        trans_print(wind_col_name, "包含时间字段,开始处理时间字段,添加年月日", filename)
+        trans_print(wind_col_name, "时间原始大小:", df.shape[0])
+        df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
+        trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
+        df['time_stamp'] = pd.to_datetime(df['time_stamp'])
+        df['year'] = df['time_stamp'].dt.year
+        df['month'] = df['time_stamp'].dt.month
+        df['day'] = df['time_stamp'].dt.day
+        df.sort_values(by='time_stamp', inplace=True)
+        df['time_stamp'] = df['time_stamp'].apply(
+            lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
+        trans_print("处理时间字段结束")
+
+        not_double_cols = ['wind_turbine_number', 'wind_turbine_name', 'time_stamp', 'param6', 'param7', 'param8',
+                           'param9', 'param10']
+        trans_print(wind_col_name, "去掉非法数据前大小:", df.shape[0])
+        df.replace(np.nan, -999999999, inplace=True)
+        for col in df.columns:
+            if col not in not_double_cols:
+                if not df[col].isnull().all():
+                    df[col] = pd.to_numeric(df[col], errors='coerce')
+                    # 删除包含NaN的行(即那些列A转换失败的行)
+                    df = df.dropna(subset=[col])
+        trans_print(wind_col_name, "去掉非法数据后大小:", df.shape[0])
+        df.replace(-999999999, np.nan, inplace=True)
+        trans_print(wind_col_name, "去掉重复数据前大小:", df.shape[0])
+        df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
+        trans_print(wind_col_name, "去掉重复数据后大小:", df.shape[0])
+        if self.pathsAndTable.save_zip:
+            save_path = os.path.join(self.pathsAndTable.get_save_path(), str(wind_col_name) + '.csv.gz')
+        else:
+            save_path = os.path.join(self.pathsAndTable.get_save_path(), str(wind_col_name) + '.csv')
+        create_file_path(save_path, is_file_path=True)
+        if self.pathsAndTable.save_zip:
+            df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
+        else:
+            df.to_csv(save_path, index=False, encoding='utf-8')
+
+        self.set_statistics_data(df)
+
+        del df
+        trans_print("保存" + str(wind_col_name) + "成功")
+
+    def mutiprocessing_to_save_file(self):
+        # 开始保存到正式文件
+        trans_print("开始保存到excel文件")
+        all_tmp_files = read_excel_files(self.pathsAndTable.get_read_tmp_path())
+        # split_count = self.pathsAndTable.multi_pool_count
+        split_count = use_files_get_max_cpu_count(all_tmp_files)
+        all_arrays = split_array(all_tmp_files, split_count)
+        try:
+            for index, arr in enumerate(all_arrays):
+                with multiprocessing.Pool(split_count) as pool:
+                    pool.starmap(self.save_to_csv, [(i,) for i in arr])
+                update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
+                                               round(50 + 20 * (index + 1) / len(all_arrays), 2),
+                                               self.pathsAndTable.save_db)
+
+        except Exception as e:
+            trans_print(traceback.format_exc())
+            message = "保存文件错误,系统返回错误:" + str(e)
+            raise ValueError(message)
+
+        trans_print("结束保存到excel文件")
+
+    def run(self):
+        trans_print("开始保存数据到正式文件")
+        begin = datetime.datetime.now()
+        self.mutiprocessing_to_save_file()
+        update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 70,
+                                       self.pathsAndTable.save_db)
+        trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin)

+ 76 - 0
etl/step/UnzipAndRemove.py

@@ -0,0 +1,76 @@
+import multiprocessing
+import os
+import traceback
+
+import datetime
+
+from etl.base.PathsAndTable import PathsAndTable
+from service.plt_service import update_trans_transfer_progress
+from utils.file.trans_methods import read_files, read_excel_files, copy_to_new, split_array
+from utils.log.trans_log import trans_print
+from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
+from utils.zip.unzip import unzip, unrar, get_desc_path
+
+
+class UnzipAndRemove(object):
+    def __init__(self, pathsAndTable: PathsAndTable):
+        self.pathsAndTable = pathsAndTable
+
+    def get_and_remove(self, file):
+
+        to_path = self.pathsAndTable.get_excel_tmp_path()
+        if str(file).endswith("zip"):
+            if str(file).endswith("csv.zip"):
+                copy_to_new(file, file.replace(self.pathsAndTable.read_path, to_path).replace("csv.zip", 'csv.gz'))
+            else:
+                desc_path = file.replace(self.pathsAndTable.read_path, to_path)
+                is_success, e = unzip(file, get_desc_path(desc_path))
+                self.pathsAndTable.has_zip = True
+                if not is_success:
+                    # raise e
+                    pass
+        elif str(file).endswith("rar"):
+            desc_path = file.replace(self.pathsAndTable.read_path, to_path)
+            is_success, e = unrar(file, get_desc_path(desc_path))
+            self.pathsAndTable.has_zip = True
+            if not is_success:
+                trans_print(traceback.format_exc())
+                pass
+        else:
+            copy_to_new(file, file.replace(self.pathsAndTable.read_path, to_path))
+
+    def remove_file_to_tmp_path(self):
+        # 读取文件
+        try:
+            if os.path.isfile(self.pathsAndTable.read_path):
+                all_files = [self.pathsAndTable.read_path]
+            else:
+                all_files = read_files(self.pathsAndTable.read_path)
+
+            # 最大取系统cpu的 三分之二
+            split_count = get_available_cpu_count_with_percent(1/2)
+            all_arrays = split_array(all_files, split_count)
+
+            for index, arr in enumerate(all_arrays):
+                with multiprocessing.Pool(self.pathsAndTable.multi_pool_count) as pool:
+                    pool.starmap(self.get_and_remove, [(i,) for i in arr])
+                update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
+                                               round(5 + 15 * (index + 1) / len(all_arrays), 2),
+                                               self.pathsAndTable.save_db)
+
+            all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
+
+            trans_print('读取文件数量:', len(all_files))
+        except Exception as e:
+            trans_print(traceback.format_exc())
+            message = "读取文件列表错误:" + self.pathsAndTable.read_path + ",系统返回错误:" + str(e)
+            raise ValueError(message)
+        return all_files
+
+    def run(self):
+        trans_print("开始解压移动文件")
+        begin = datetime.datetime.now()
+        self.remove_file_to_tmp_path()
+        update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 20,
+                                       self.pathsAndTable.save_db)
+        trans_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin)

+ 0 - 0
etl/step/__init__.py


+ 0 - 162
qinghai-nuomuhong.py

@@ -1,162 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-Spyder 编辑器
-
-这是一个临时脚本文件。
-"""
-import copy
-import datetime
-import multiprocessing
-import os
-
-import pandas as pd
-import numpy as np
-
-dianjian_str = """
-wind_turbine_number		
-time_stamp		时间
-active_power		有功功率 kW
-rotor_speed		风轮转速 rpm
-generator_speed		发电机转速 rpm
-wind_velocity		风速 m/s
-pitch_angle_blade_1		叶片1角度 °
-pitch_angle_blade_2		叶片2角度 °
-pitch_angle_blade_3		叶片3角度 °
-cabin_position		机舱位置 °
-true_wind_direction		
-yaw_error1		风向 °
-twisted_cable_angle		
-main_bearing_temperature		主轴温度 ℃
-gearbox_oil_temperature		齿轮箱温度 ℃
-gearbox_low_speed_shaft_bearing_temperature		齿轮箱轴承温度 ℃
-gearboxmedium_speed_shaftbearing_temperature		
-gearbox_high_speed_shaft_bearing_temperature		齿轮箱轴承温度2 ℃
-generatordrive_end_bearing_temperature		发电机驱动侧轴承温度 ℃
-generatornon_drive_end_bearing_temperature		发电机非驱动侧轴承温度 ℃
-cabin_temperature		机舱温度 ℃
-outside_cabin_temperature		舱外温度 ℃
-generator_winding1_temperature		
-generator_winding2_temperature		
-generator_winding3_temperature		
-front_back_vibration_of_the_cabin		
-side_to_side_vibration_of_the_cabin		
-required_gearbox_speed		
-inverter_speed_master_control		
-actual_torque		
-given_torque		
-clockwise_yaw_count		
-counterclockwise_yaw_count		
-unusable		
-power_curve_available		
-set_value_of_active_power		有功功率设定 kW
-wind_turbine_status		
-wind_turbine_status2		
-turbulence_intensity		
-"""
-
-datas = [i for i in dianjian_str.split("\n") if i]
-
-dianjian_dict = dict()
-
-for data in datas:
-    ds = data.split("\t")
-
-    if len(ds) == 3:
-        dianjian_dict[ds[0]] = ds[2]
-    else:
-        dianjian_dict[ds[0]] = ''
-
-
-def read_df(file_path):
-    df = pd.read_csv(file_path, header=[0, 1])
-
-    col_nams_map = dict()
-    pre_col = ""
-    for tuple_col in df.columns:
-        col1 = tuple_col[0]
-        col2 = tuple_col[1]
-        if str(col1).startswith("Unnamed"):
-            if pre_col:
-                col1 = pre_col
-                pre_col = ''
-            else:
-                col1 = ''
-        else:
-            pre_col = col1
-
-        if str(col2).startswith("Unnamed"):
-            col2 = ''
-
-        col_nams_map[str(tuple_col)] = ''.join([col1, col2])
-
-    for k, v in col_nams_map.items():
-        if str(v).endswith('均值'):
-            col_nams_map[k] = str(v)[:-2]
-
-    df.columns = [str(col) for col in df.columns]
-    df.rename(columns=col_nams_map, inplace=True)
-
-    for col, name in dianjian_dict.items():
-        if name in df.columns:
-            df.rename(columns={name: col}, inplace=True)
-
-    for col in df.columns:
-        if col not in dianjian_dict.keys():
-            del df[col]
-
-    return df
-
-
-def get_wind_name_files(path):
-    files = os.listdir(path)
-
-    wind_files_map = dict()
-    for file in files:
-        full_file = os.path.join(path, file)
-        file_datas = str(file).split("@")
-        key = file_datas[0].replace("HD", "HD2")
-        if key in wind_files_map.keys():
-            wind_files_map[key].append(full_file)
-        else:
-            wind_files_map[key] = [full_file]
-
-    return wind_files_map
-
-
-def combine_df(save_path, wind_name, files):
-    begin = datetime.datetime.now()
-    df = pd.DataFrame()
-    for file in files:
-        query_df = read_df(file)
-        print("读取", file, query_df.shape)
-        query_df['time_stamp'] = pd.to_datetime(query_df['time_stamp'])
-        query_df.set_index(keys='time_stamp', inplace=True)
-        query_df = query_df[~query_df.index.duplicated(keep='first')]
-        if df.empty:
-            df = copy.deepcopy(query_df)
-        else:
-            df = pd.concat([df, query_df], axis=1, join='inner')
-    df.reset_index(inplace=True)
-    df['wind_turbine_number'] = wind_name
-    for col, name in dianjian_dict.items():
-        if col not in df.columns:
-            df[col] = np.nan
-
-    df = df[dianjian_dict.keys()]
-    df.to_csv(os.path.join(save_path, wind_name + ".csv"), encoding='utf-8', index=False)
-
-    print(wind_name, '整理完成', '耗时:', (datetime.datetime.now() - begin).seconds)
-
-
-if __name__ == '__main__':
-    read_path = r'/data/download/collection_data/1进行中/诺木洪风电场-甘肃-华电/收资数据/sec'
-    save_path = r'/data/download/collection_data/1进行中/诺木洪风电场-甘肃-华电/清理数据/sec'
-
-    # read_path = r'D:\trans_data\诺木洪\收资数据\min'
-    # save_path = r'D:\trans_data\诺木洪\清理数据\min'
-    if not os.path.exists(save_path):
-        os.makedirs(save_path, exist_ok=True)
-    wind_files_map = get_wind_name_files(read_path)
-
-    with multiprocessing.Pool(6) as pool:
-        pool.starmap(combine_df, [(save_path, wind_name, files) for wind_name, files in wind_files_map.items()])

+ 10 - 0
requirements.txt

@@ -0,0 +1,10 @@
+chardet==5.2.0
+DBUtils==3.1.0
+numpy==2.0.0
+pandas==2.2.2
+psutil==6.0.0
+PyMySQL==1.1.0
+PyYAML==6.0.1
+PyYAML==6.0.1
+rarfile==4.2
+SQLAlchemy==2.0.30

+ 2 - 3
service/__init__.py

@@ -1,3 +1,2 @@
-# -*- coding: utf-8 -*-
-# @Time    : 2024/6/7
-# @Author  : 魏志亮
+
+

+ 48 - 19
service/plt_service.py

@@ -1,15 +1,32 @@
 # -*- coding: utf-8 -*-
 # @Time    : 2024/6/7
 # @Author  : 魏志亮
-from utils.db.ConnectMysqlPool import ConnectMysqlPool
+import datetime
 
-plt = ConnectMysqlPool("plt")
+from utils.db.ConnectMysql import ConnectMysql
+
+plt = ConnectMysql("plt")
+
+
+def update_timeout_trans_data():
+    sql = """
+    UPDATE data_transfer  
+    SET trans_sys_status = 2,err_info='运行超时失败',transfer_state=2
+    WHERE   
+        (  
+            (transfer_type = 'second' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 24)  
+            OR  
+            (transfer_type = 'minute' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 6)  
+        )  
+        AND trans_sys_status = 0
+    """
+    plt.execute(sql)
 
 
 def update_trans_status_running(batch_no, trans_type, schedule_exec=True):
     if schedule_exec:
         exec_sql = """
-        update data_transfer set trans_sys_status = 0 
+        update data_transfer set trans_sys_status = 0 ,transfer_start_time = now()
         where batch_code = %s  and transfer_type = %s
         """
         plt.execute(exec_sql, (batch_no, trans_type))
@@ -21,16 +38,29 @@ def update_trans_status_error(batch_no, trans_type, message="", save_db=True):
         update data_transfer set transfer_state = 2,trans_sys_status=2 ,err_info= %s,transfer_finish_time=now() 
         where batch_code = %s  and  transfer_type = %s
         """
+
+        message = message if len(message) <= 200 else message[0:200]
         plt.execute(exec_sql, (message, batch_no, trans_type))
 
 
-def update_trans_status_success(batch_no, trans_type, wind_count=0, time_granularity=0, save_db=True):
+def update_trans_status_success(batch_no, trans_type, wind_count=0, time_granularity=0,
+                                min_date=datetime.datetime.now(),
+                                max_date=datetime.datetime.now(),
+                                total_count=0, save_db=True):
     if save_db:
-        exec_sql = """
-        update data_transfer set transfer_state = 1,trans_sys_status = 1,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now()  
-        where batch_code = %s  and transfer_type = %s
-        """
-        plt.execute(exec_sql, (wind_count, time_granularity, batch_no, trans_type))
+        if min_date is not None:
+            exec_sql = """
+            update data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress=100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now(),
+            data_min_time= %s,data_max_time= %s,transfer_data_count=%s
+            where batch_code = %s  and transfer_type = %s
+            """
+            plt.execute(exec_sql, (wind_count, time_granularity, min_date, max_date, total_count, batch_no, trans_type))
+        else:
+            exec_sql = """
+            update data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress = 100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now()
+            where batch_code = %s  and transfer_type = %s
+            """
+            plt.execute(exec_sql, (wind_count, time_granularity, batch_no, trans_type))
 
 
 def update_trans_transfer_progress(batch_no, trans_type, transfer_progress=0, save_db=True):
@@ -42,8 +72,8 @@ def update_trans_transfer_progress(batch_no, trans_type, transfer_progress=0, sa
 
 
 # 获取执行的数据
-def get_exec_data() -> dict:
-    query_running_sql = "select 1 from data_transfer where trans_sys_status = 0"
+def get_exec_data(run_count: int = 1) -> dict:
+    query_running_sql = "select count(1) as count from data_transfer where trans_sys_status = 0"
     query_next_exdc_sql = """
     SELECT
         t.*,a.field_name,b.batch_name
@@ -51,14 +81,15 @@ def get_exec_data() -> dict:
         data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
         inner join wind_field_batch b on t.batch_code = b.batch_code
     WHERE
-        (t.trans_sys_status = -1 or ( t.trans_sys_status = 2 and t.transfer_state = 0))
+        ((t.trans_sys_status = -1 and t.transfer_state = 0) or ( t.trans_sys_status in (1,2) and t.transfer_state = 0))
     AND t.transfer_addr != ''
     ORDER BY
         t.update_time
     LIMIT 1
     """
     data = plt.execute(query_running_sql)
-    if data:
+    now_count = int(data[0]['count'])
+    if now_count >= run_count:
         return None
     else:
         data = plt.execute(query_next_exdc_sql)
@@ -79,7 +110,6 @@ def get_all_wind(field_code):
 def get_all_wind_company():
     query_sql = "SELECT t.field_name FROM wind_field t where t.del_state = 0"
     datas = plt.execute(query_sql)
-    datas = []
     if datas:
         return [v for data in datas for k, v in data.items()]
     else:
@@ -87,10 +117,9 @@ def get_all_wind_company():
 
 
 if __name__ == '__main__':
-    print(get_all_wind('WOF01000002'))
-
-    print(get_all_wind_company())
+    print(get_exec_data(run_count=1))
 
-    print(get_exec_data())
+    print("**********************")
+    print(get_exec_data(run_count=2))
 
-    # print(update_trans_status_success("test_唐龙-定时任务测试", "second", 10))
+# print(update_trans_status_success("test_唐龙-定时任务测试", "second", 10))

+ 40 - 36
service/trans_service.py

@@ -2,22 +2,19 @@
 # @Time    : 2024/6/7
 # @Author  : 魏志亮
 import os
+import traceback
 
 import pandas as pd
-from pandas import DataFrame
-from sqlalchemy import create_engine
 
-from conf.db import mysql_config
-from utils.db.ConnectMysqlPool import ConnectMysqlPool
-from utils.file.trans_methods import read_file_to_df
+from utils.db.ConnectMysql import ConnectMysql
 from utils.log.trans_log import trans_print
 
-trans = ConnectMysqlPool("trans")
+trans = ConnectMysql("trans")
 
 
-def get_trans_conf(wind_name, trans_type) -> dict:
-    query_sql = "SELECT * FROM trans_conf where wind_name = %s and type = %s"
-    res = trans.execute(query_sql, (wind_name, trans_type))
+def get_trans_conf(field_code, wind_name, trans_type) -> dict:
+    query_sql = "SELECT * FROM trans_conf where wind_code = %s and type = %s"
+    res = trans.execute(query_sql, (field_code, trans_type))
     print(res)
     if type(res) == tuple:
         return None
@@ -80,11 +77,11 @@ def creat_table_and_add_partition(table_name, count, read_type):
         `param3` DOUBLE DEFAULT NULL COMMENT '预留3',
         `param4` DOUBLE DEFAULT NULL COMMENT '预留4',
         `param5` DOUBLE DEFAULT NULL COMMENT '预留5',
-        `param6` DOUBLE DEFAULT NULL COMMENT '预留6',
-        `param7` DOUBLE DEFAULT NULL COMMENT '预留7',
-        `param8` DOUBLE DEFAULT NULL COMMENT '预留8',
-        `param9` DOUBLE DEFAULT NULL COMMENT '预留9',
-        `param10` DOUBLE DEFAULT NULL COMMENT '预留10',
+        `param6` VARCHAR (20) DEFAULT NULL COMMENT '预留6',
+        `param7` VARCHAR (20) DEFAULT NULL COMMENT '预留7',
+        `param8` VARCHAR (20) DEFAULT NULL COMMENT '预留8',
+        `param9` VARCHAR (20) DEFAULT NULL COMMENT '预留9',
+        `param10` VARCHAR (20) DEFAULT NULL COMMENT '预留10',
          KEY `time_stamp` (`time_stamp`),
          KEY `wind_turbine_number` (`wind_turbine_number`)
     ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4
@@ -101,34 +98,41 @@ def rename_table(table_name, renamed_table_name, save_db=True):
         rename_sql = f"RENAME TABLE {table_name} TO {renamed_table_name}"
         try:
             trans.execute(rename_sql)
-        except Exception as e:
-            trans_print(e)
+        except:
+            trans_print(traceback.format_exc())
 
 
 def drop_table(table_name, save_db=True):
     if save_db:
-        rename_sql = f"drop TABLE `{table_name}` "
+        rename_sql = f"drop TABLE `{table_name}`"
         try:
             trans.execute(rename_sql)
-        except Exception as e:
-            trans_print(e)
-
-
-def save_file_to_db(table_name: str, file: str, batch_count=200000):
-    # trans.df_batch_save(table_name, df, batch_count)
-    env = os.environ['env']
-
-    config = mysql_config['trans_' + env]
-    username = config['user']
-    password = config['password']
-    host = config['host']
-    port = config['port']
-    dbname = config['database']
-    engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
-    for i, df in enumerate(pd.read_csv(file, chunksize=batch_count)):
-        df.to_sql(table_name, engine, if_exists='append', index=False)
-        count = (i + 1) * batch_count
-        trans_print(f"Chunk {count} written to MySQL.")
+        except:
+            trans_print(traceback.format_exc())
+
+
+def save_file_to_db(table_name: str, file: str, batch_count=50000):
+    base_name = os.path.basename(file)
+    try:
+        for i, df in enumerate(pd.read_csv(file, chunksize=batch_count)):
+            # df.to_sql(table_name, engine, if_exists='append', index=False)
+            trans.execute_df_save(df, table_name)
+            count = (i + 1) * batch_count
+            trans_print(base_name, f"Chunk {count} written to MySQL.")
+    except Exception as e:
+        trans_print(traceback.format_exc())
+        message = base_name + str(e)
+        raise Exception(message)
+
+
+def batch_statistics(table_name):
+    query_sql = f"select count(1) as total_count ,min(t.time_stamp) as min_date ,max(t.time_stamp) as max_date from `{table_name}` t "
+    try:
+        res = trans.execute(query_sql)
+        return res[0]
+    except:
+        trans_print(traceback.format_exc())
+        return None
 
 
 if __name__ == '__main__':

+ 0 - 3
test/__init__.py

@@ -1,3 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time    : 2024/6/18
-# @Author  : 魏志亮

+ 0 - 4
utils/conf/read_conf.py

@@ -20,7 +20,3 @@ def read_conf(dict_conf, col, default_value=None):
     else:
         return default_value
 
-
-if __name__ == '__main__':
-    from pprint import pprint
-    pprint(yaml_conf("../../conf/db.yaml"))

+ 51 - 0
utils/db/ConnectMysql.py

@@ -0,0 +1,51 @@
+import os
+import traceback
+
+import pymysql
+from pymysql.cursors import DictCursor
+from sqlalchemy import create_engine
+
+from utils.conf.read_conf import yaml_conf
+from utils.log.trans_log import trans_print
+
+
+class ConnectMysql:
+
+    def __init__(self, connet_name):
+        self.yaml_data = yaml_conf("/data/config/etl_config.yaml")
+        self.connet_name = connet_name
+        if 'env' in os.environ:
+            self.env = os.environ['env']
+        else:
+            self.env = 'dev'
+
+    # 从连接池中获取一个连接
+    def get_conn(self):
+        return pymysql.connect(**self.yaml_data[self.connet_name + "_" + self.env])
+
+    # 使用连接执行sql
+    def execute(self, sql, params=tuple()):
+
+        with self.get_conn() as conn:
+            with conn.cursor(cursor=DictCursor) as cursor:
+                try:
+                    cursor.execute(sql, params)
+                    trans_print("开始执行SQL:", cursor._executed)
+                    conn.commit()
+                    result = cursor.fetchall()
+                    return result
+                except Exception as e:
+                    trans_print(f"执行sql:{sql},报错:{e}")
+                    trans_print(traceback.format_exc())
+                    conn.rollback()
+                    raise e
+
+    def execute_df_save(self, df, table_name):
+        config = self.yaml_data[self.connet_name + "_" + self.env]
+        username = config['user']
+        password = config['password']
+        host = config['host']
+        port = config['port']
+        dbname = config['database']
+        engine = create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
+        df.to_sql(table_name, engine, index=False, if_exists='append')

+ 0 - 153
utils/db/ConnectMysqlPool.py

@@ -1,153 +0,0 @@
-import numpy as np
-import pandas as pd
-import pymysql
-from dbutils.pooled_db import PooledDB
-import os
-
-from pandas import DataFrame
-from pymysql.cursors import DictCursor
-
-from conf.db import mysql_config
-from utils.log.trans_log import trans_print
-
-
-class ConnectMysqlPool:
-    """
-    连接MySQL数据库的连接池类。
-
-    属性:
-    db_account (dict): 数据库账号信息,包括用户名和密码等。
-    db (str): 数据库名称。
-    pool (PooledDB): MySQL连接池对象。
-
-    方法:
-    __init__: 初始化连接池类实例。
-    _obtaining_data: 从配置文件中获取测试数据。
-    create_mysql_pool: 创建MySQL连接池。
-    get_conn: 从连接池中获取一个连接。
-    close: 关闭数据库连接和游标。
-    execute: 使用连接执行SQL语句。
-    """
-
-    def __init__(self, connet_name):
-        """
-        初始化连接池类实例。
-
-        参数:
-        db (str): 测试库名称。
-        db_account (dict): 包含数据库账号信息的字典。
-        """
-        self.yaml_data = mysql_config
-        self.connet_name = connet_name
-        self.env = os.environ['env']
-        print("self.env", self.env)
-        # 创建连接池
-        self.pool = self.create_mysql_pool()
-
-    # 创建MySQL连接池
-    def create_mysql_pool(self):
-        """
-        根据配置信息创建MySQL连接池。
-
-        返回:
-        PooledDB: MySQL连接池对象。
-        """
-        pool = PooledDB(
-            **self.yaml_data[self.connet_name + '_connect_pool_config'],
-            **self.yaml_data[self.connet_name + "_" + self.env],
-            ping=2,
-            creator=pymysql
-        )
-        return pool
-
-    # 从连接池中获取一个连接
-    def get_conn(self):
-        """
-        从连接池中获取一个数据库连接。
-
-        返回:
-        connection: 数据库连接对象。
-        """
-        return self.pool.connection()
-
-    # 使用连接执行sql
-    def execute(self, sql, params=tuple()):
-        """
-        使用获取的连接执行SQL语句。
-
-        参数:
-        sql (str): SQL语句。
-        params (tuple): SQL参数。
-
-        返回:
-        list: 执行SQL语句后的结果集,若执行出错则返回None。
-        """
-
-        with self.get_conn() as conn:
-            with conn.cursor(cursor=DictCursor) as cursor:
-                try:
-                    cursor.execute(sql, params)
-                    trans_print("开始执行SQL:", cursor._executed)
-                    conn.commit()
-                    result = cursor.fetchall()
-                    return result
-                except Exception as e:
-                    trans_print(f"执行sql:{sql},报错:{e}")
-                    conn.rollback()
-                    raise e
-
-    def save_dict(self, table_name: str, params: dict):
-        keys = params.keys()
-        col_str = ",".join(keys)
-        data_s_str = ",".join(["%s"] * len(keys))
-
-        insert_sql = f"replace into {table_name} ({col_str}) values ({data_s_str})"
-        with self.get_conn() as conn:
-            with conn.cursor() as cursor:
-                try:
-                    cursor.execute(insert_sql, tuple(params.values()))
-                    conn.commit()
-                except Exception as e:
-                    trans_print(f"执行sql:{insert_sql},报错:{e}")
-                    conn.rollback()
-                    raise e
-
-    # 使用连接执行sql
-    def df_batch_save(self, table_name: str, df: DataFrame, batch_count=20000):
-        col_str = ",".join(df.columns)
-        data_s_str = ",".join(["%s"] * len(df.columns))
-
-        insert_sql = f"INSERT INTO `{table_name}` ({col_str}) values ({data_s_str})"
-
-        # 转化nan到null
-        df.replace(np.nan, None, inplace=True)
-
-        total_count = df.shape[0]
-        for i in range(0, total_count + 1, batch_count):
-            with self.get_conn() as conn:
-                with conn.cursor() as cursor:
-                    try:
-                        query_df = df.iloc[i:i + batch_count]
-                        if not query_df.empty:
-                            values = [tuple(data) for data in query_df.values]
-                            cursor.executemany(insert_sql, values)
-                            conn.commit()
-                            result = cursor.fetchall()
-                            trans_print(
-                                "总条数" + str(df.shape[0]) + ",已保存:" + str(i + batch_count))
-                    except Exception as e:
-                        conn.rollback()
-                        raise e
-
-
-if __name__ == '__main__':
-    plt = ConnectMysqlPool("plt")
-
-    print(plt.execute("select * from data_transfer limit 2"))
-
-    trans = ConnectMysqlPool("trans")
-
-    df = pd.DataFrame()
-    df['name'] = ['name' + str(i) for i in range(1000)]
-
-    print(trans.df_batch_save('test', df, 33))

+ 30 - 24
utils/file/trans_methods.py

@@ -47,33 +47,39 @@ def split_array(array, num):
 # 读取数据到df
 def read_file_to_df(file_path, read_cols=list(), header=0):
     trans_print('开始读取文件', file_path)
-    df = pd.DataFrame()
-    if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
-        encoding = detect_file_encoding(file_path)
-        end_with_gz = str(file_path).lower().endswith("gz")
-        if read_cols:
-            if end_with_gz:
-                df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header)
-            else:
-                df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header)
-        else:
-
-            if end_with_gz:
-                df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header)
-            else:
-                df = pd.read_csv(file_path, encoding=encoding, header=header)
-
-    else:
-        xls = pd.ExcelFile(file_path)
-        # 获取所有的sheet名称
-        sheet_names = xls.sheet_names
-        for sheet in sheet_names:
+    try:
+        df = pd.DataFrame()
+        if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
+            encoding = detect_file_encoding(file_path)
+            end_with_gz = str(file_path).lower().endswith("gz")
             if read_cols:
-                df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header, usecols=read_cols)])
+                if end_with_gz:
+                    df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header)
+                else:
+                    df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header,
+                                     on_bad_lines='warn')
             else:
-                df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header)])
 
-    trans_print('文件读取成功', file_path, '文件数量', df.shape)
+                if end_with_gz:
+                    df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header)
+                else:
+                    df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn')
+
+        else:
+            xls = pd.ExcelFile(file_path)
+            # 获取所有的sheet名称
+            sheet_names = xls.sheet_names
+            for sheet in sheet_names:
+                if read_cols:
+                    df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header, usecols=read_cols)])
+                else:
+                    df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header)])
+
+        trans_print('文件读取成功', file_path, '文件数量', df.shape)
+    except Exception as e:
+        trans_print('读取文件出错', file_path, str(e))
+        message = '文件:' + os.path.basename(file_path) + ',' + str(e)
+        raise ValueError(message)
 
     return df
 

+ 2 - 2
utils/log/trans_log.py

@@ -24,7 +24,7 @@ class ContextFilter(logging.Filter):
         return True
 
 
-logger = logging.getLogger("trans_data")
+logger = logging.getLogger("etl_tools")
 logger.setLevel(logging.INFO)
 stout_handle = logging.StreamHandler(sys.stdout)
 stout_handle.setFormatter(
@@ -33,7 +33,7 @@ stout_handle.setLevel(logging.INFO)
 stout_handle.addFilter(ContextFilter())
 logger.addHandler(stout_handle)
 
-log_path = r'/data/logs/trans_data_' + os.environ['env']
+log_path = r'/data/logs/etl_tools_' + (os.environ['env'] if 'env' in os.environ else 'dev')
 file_path = os.path.join(log_path)
 
 if not os.path.exists(file_path):

+ 0 - 0
utils/systeminfo/__init__.py


+ 65 - 0
utils/systeminfo/sysinfo.py

@@ -0,0 +1,65 @@
+import os
+import psutil
+
+from utils.log.trans_log import trans_print
+
+
+def get_cpu_count():
+    return psutil.cpu_count()
+
+
+def get_available_cpu_count_with_percent(percent: float = 1):
+    cpu_count = get_cpu_count()
+    return int(cpu_count * percent)
+
+
+def get_file_size(file_path):
+    return os.path.getsize(file_path)
+
+
+def get_available_memory_with_percent(percent: float = 1):
+    memory_info = psutil.virtual_memory()
+    return int(memory_info.available * percent)
+
+
+def get_max_file_size(file_paths: list[str]):
+    max_size = 0
+    for file_path in file_paths:
+        file_size = get_file_size(file_path)
+        if file_size > max_size:
+            max_size = file_size
+    return max_size
+
+
+def use_files_get_max_cpu_count(file_paths: list[str], memory_percent: float = 1 / 9, cpu_percent: float = 1 / 3):
+    max_file_size = get_max_file_size(file_paths)
+    free_memory = get_available_memory_with_percent(memory_percent)
+    count = int(free_memory / max_file_size)
+    max_cpu_count = get_available_cpu_count_with_percent(cpu_percent)
+    result = count if count <= max_cpu_count else max_cpu_count
+    trans_print("总文件数:", len(file_paths), ",获取最大文件大小:", str(round(max_file_size / 2 ** 20, 2)) + "M",
+                "可用内存:", str(get_available_memory_with_percent(1) / 2 ** 20) + "M",
+                "总CPU数:", get_cpu_count(), "CPU可用数量:", max_cpu_count,
+                ",最终确定使用进程数:", result)
+    return result
+
+
+if __name__ == '__main__':
+    from utils.file.trans_methods import read_files
+    import datetime
+
+    read_path = r"Z:\collection_data\1进行中\密马风电场-山西-大唐\收资数据\scada\秒级数据"
+    begin = datetime.datetime.now()
+    all_files = read_files(read_path)
+    print(datetime.datetime.now() - begin)
+    count = use_files_get_max_cpu_count(all_files)
+
+    print(count)
+
+    print(get_available_memory_with_percent(1) / 2 ** 20)
+    print(get_available_memory_with_percent(2 / 3) / 2 ** 20)
+
+    begin = datetime.datetime.now()
+    print(len(all_files))
+    print(get_max_file_size(all_files) / 2 ** 20)
+    print(datetime.datetime.now() - begin)

+ 0 - 11
utils/tmp_use/combine_two_datas.py

@@ -1,11 +0,0 @@
-import os
-import pandas as pd
-
-from utils.file.trans_methods import read_file_to_df
-
-
-def combine_two_datas(path):
-    for file in os.listdir(path):
-        read_file = os.path.join(path, file)
-        df = read_file_to_df(read_file)
-        other_file = read_file.replace("second", "second_1").replace("新艾里-2024021", "新艾里-2024021_1")

+ 1 - 0
utils/zip/unzip.py

@@ -91,6 +91,7 @@ def unrar(rar_file_path, dest_dir):
                 # 解压文件到目标目录
                 rf.extract(member, dest_path)
     except Exception as e:
+        trans_print(traceback.format_exc())
         logger.exception(e)
         is_success = False
         trans_print('不是rar文件:', rar_file_path)