Browse Source

第一版,未开发完

wzl 5 months ago
commit
f93ed33b67

+ 6 - 0
.idea/.gitignore

@@ -0,0 +1,6 @@
+# 默认忽略的文件
+/shelf/
+/workspace.xml
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml

+ 9 - 0
.idea/misc.xml

@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="Black">
+    <option name="sdkName" value="Python 3.9" />
+  </component>
+  <component name="ProjectRootManager" version="2" languageLevel="JDK_22" project-jdk-name="Python 3.9" project-jdk-type="Python SDK">
+    <output url="file://$PROJECT_DIR$/out" />
+  </component>
+</project>

+ 8 - 0
.idea/modules.xml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectModuleManager">
+    <modules>
+      <module fileurl="file://$PROJECT_DIR$/ImportData_Python.iml" filepath="$PROJECT_DIR$/ImportData_Python.iml" />
+    </modules>
+  </component>
+</project>

+ 6 - 0
.idea/vcs.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$" vcs="Git" />
+  </component>
+</project>

+ 9 - 0
ImportData_Python.iml

@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="PYTHON_MODULE" version="4">
+  <component name="NewModuleRootManager" inherit-compiler-output="true">
+    <exclude-output />
+    <content url="file://$MODULE_DIR$" />
+    <orderEntry type="inheritedJdk" />
+    <orderEntry type="sourceFolder" forTests="false" />
+  </component>
+</module>

+ 15 - 0
conf/conf_dev.yaml

@@ -0,0 +1,15 @@
+import_data:
+  database: import_data
+  host: 192.168.50.235
+  password: admin123456
+  port: 30306
+  user: root
+
+
+# 日志保存路径
+log_path_dir: /data/logs
+
+# 临时文件存放处,有些甲方公司的tmp太小,只好自己配置
+tmp_base_path: /data/tmp-dev
+
+run_batch_count: 2

+ 14 - 0
conf/conf_prod.yaml

@@ -0,0 +1,14 @@
+import_data:
+  database: import_data
+  host: 192.168.50.235
+  password: admin123456
+  port: 30306
+  user: root
+
+# 日志保存路径
+log_path_dir: /data/logs
+
+# 临时文件存放处,有些甲方公司的tmp太小,只好自己配置
+tmp_base_path: /data/tmp-prod
+
+run_batch_count: 2

+ 2 - 0
db_server/__init__.py

@@ -0,0 +1,2 @@
+import  pandas as pd
+pd.__version__

+ 3 - 0
db_server/base.py

@@ -0,0 +1,3 @@
+from utils.db.ConnectMysql import ConnectMysql
+
+import_db = ConnectMysql('import_data')

+ 40 - 0
import_exec.py

@@ -0,0 +1,40 @@
+import os
+import shutil
+import sys
+import traceback
+
+from service.import_data_service import get_exec_data, run_fail
+from trans.ImportParam import ImportParam
+from utils.conf.read_conf import read_conf, yaml_conf
+from utils.log.import_data_log import log_print
+
+
+def run_data(env_conf):
+    run_count = read_conf(env_conf, 'run_count', 1)
+    run_data = get_exec_data(run_count)
+    if run_data:
+        import_param = None
+        try:
+            import_param = ImportParam(run_data, env_conf)
+            import_param.run()
+
+        except Exception as e:
+            log_print(traceback.format_exc())
+            run_fail(run_data['id'], e)
+            log_print("开始删除临时文件夹")
+            if import_param and os.path.exists(import_param.tmp_root):
+                shutil.rmtree(import_param.tmp_root)
+            log_print("删除临时文件夹删除成功")
+
+
+if __name__ == '__main__':
+    from utils.common_util import get_project_conf_path_file
+
+    env = 'dev'
+
+    if len(sys.argv) >= 2:
+        env = sys.argv[1]
+
+    conf_path = get_project_conf_path_file(env)
+    env_conf = yaml_conf(conf_path)
+    run_data(env_conf)

+ 0 - 0
service/__init__.py


+ 69 - 0
service/import_data_service.py

@@ -0,0 +1,69 @@
+from db_server.base import import_db
+from utils.log.import_data_log import log_print
+
+db = import_db
+
+
+def get_exec_data(run_count=1):
+    run_count_datas = db.execute('select count(1) as count from executor_history where exec_status = 0')
+    log_print(run_count_datas)
+    running_count = int(run_count_datas[0]['count'])
+    if running_count < run_count:
+        run_datas = db.execute('select * from executor_history where exec_status = -1 limit 1')
+        log_print(run_datas)
+        if run_datas:
+            return run_datas[0]
+        else:
+            log_print("没有可执行的任务")
+    else:
+        log_print("当前已达执行上限")
+
+    return None
+
+
+def begin_run(id, save_db=True):
+    if save_db:
+        db.execute(
+            'update executor_history set exec_status = 0,begin_time=now() where id = %s', (id,))
+
+
+def update_transfer_progress(id, number, process_count, now_count, save_db=True):
+    number = round((now_count - 1) * 100 / process_count + number)
+    log_print(f"{process_count}个任务,当前进度{now_count}, 当前比例:{number}")
+    if save_db:
+        db.execute('update executor_history set transfer_progress = %s where id = %s', (number, id))
+
+
+def run_success(id, save_db=True):
+    if save_db:
+        db.execute('update executor_history set exec_status = 1,end_time=now()  where id = %s', (id,))
+
+
+def run_fail(id, error, save_db=True):
+    if save_db:
+        error = str(error)[0:200 if len(error) > 200 else len(error)]
+        db.execute('update executor_history set exec_status = 2,err_info = %s,end_time=now()  where id = %s',
+                   (error, id))
+
+
+def get_exec_group(id):
+    return db.execute("select * from process_group where id = %s", (id,))
+
+
+def get_mapping_field(process_executor_id):
+    return db.execute("""
+        SELECT
+            t.standardized_name,
+            t.data_name,
+            a.is_cut_col,
+            a.is_index 
+        FROM
+            mapping_field t
+            INNER JOIN base_template_field a ON t.template_filed_id = a.id 
+        WHERE
+            t.process_executor_id = %s
+    """, (process_executor_id,))
+
+
+if __name__ == '__main__':
+    get_exec_data(2)

+ 56 - 0
trans/ExecParam.py

@@ -0,0 +1,56 @@
+from service.import_data_service import get_mapping_field
+from trans.common.PathParam import PathParam
+
+
+class ExecParam(object):
+    def __init__(self, data, tmp_root, read_dir):
+
+        self.process_executor_id = data['id']
+        self.process_executor = data['name']
+        self.read_path = data['readPath']
+        # 0:竖向 1:横向
+        self.join_type = data['joinType']
+        self.path_param = PathParam(tmp_root, f'{self.process_executor_id}_{self.process_executor}', read_dir)
+
+        self.mapping_list = list()
+        results = get_mapping_field(self.process_executor_id)
+        if results:
+            for result in results:
+                self.mapping_list.append(MappingFiled(result))
+        else:
+            raise Exception(f"根据 映射执行器ID:{self.process_executor_id} 没有找到映射字段")
+
+        self.index_cols = list()
+        self.split_cols = list()
+        self.has_header = True
+        self.mapping_cols = dict()
+        self.use_cols = list()
+        self.read_cols(self.mapping_list)
+
+    def read_cols(self, mapping_list):
+        for mapping_filed in mapping_list:
+            data_name = mapping_filed.data_name
+            if str(data_name).startswith('无列名'):
+                self.has_header = False
+                name = int(str(data_name).replace('无列名', ''))
+                self.use_cols.append(name)
+                self.mapping_cols[name] = mapping_filed.standardized_name
+            else:
+                data_name_arrays = [str(i).replace('缺失列名', 'Unnamed: ') for i in str(data_name).split(',') if i]
+                self.use_cols.extend(data_name_arrays)
+                for data_name_str in data_name_arrays:
+                    self.mapping_cols[data_name_str] = mapping_filed.standardized_name
+
+            if mapping_filed.is_index:
+                self.index_cols.append(mapping_filed.standardized_name)
+
+            if mapping_filed.is_cut_col:
+                self.split_cols.append(mapping_filed.standardized_name)
+
+
+class MappingFiled(object):
+    def __init__(self, data):
+        self.standardized_name = data['standardized_name']
+        self.data_name = data['data_name']
+        self.is_cut_col = data['is_cut_col']
+        self.is_index = data['is_index']

+ 62 - 0
trans/ImportParam.py

@@ -0,0 +1,62 @@
+import datetime
+import json
+import os.path
+
+from trans.ExecParam import ExecParam
+from service.import_data_service import get_exec_group, get_exec_data
+from trans.common.ReadAndSaveTmp import ReadAndSaveTmp
+from trans.common.UnzipAndRemove import UnzipAndRemove
+from utils.conf.read_conf import read_conf
+from utils.log.import_data_log import log_print
+
+
+class ImportParam(object):
+
+    def __init__(self, data, env_conf, save_db=True):
+        results = get_exec_group(data['id'])
+        self.executor_history_id = data['id']
+        self.executor_history_name = data['name']
+        self.tmp_root = read_conf(env_conf, "tmp_base_path", "/tmp")
+        self.save_db = save_db
+
+        if results:
+            result = results[0]
+            self.process_group_name = result['name']
+            self.join_type = result['join_type']
+            self.process_group_id = data['process_group_id']
+            self.tmp_root = os.path.join(self.tmp_root, f'{self.executor_history_id}_{self.executor_history_name}')
+            self.out_put_dir = data['out_put_dir']
+            self.execotor_list = list()
+            for exec_data in json.loads(data['input_dirs']):
+                self.execotor_list.append(ExecParam(exec_data, self.tmp_root))
+        else:
+            log_print(f"通过ID: {data['id']}没有获取到执行器组")
+            raise Exception(f"通过ID: {data['id']}没有获取到执行器组")
+
+    def run(self):
+        total_begin = datetime.datetime.now()
+        log_print(f'开始执行{self.process_group_name}')
+
+        process_count = len(self.execotor_list)
+        for index, exec_data in enumerate(self.execotor_list):
+            log_print(f'开始执行{exec_data.process_executor}')
+
+            begin = datetime.datetime.now()
+            # 移动解压文件
+            unzip_and_remove = UnzipAndRemove(self.executor_history_id, process_count, index,
+                                              exec_data.path_param, self.save_db)
+            unzip_and_remove.run()
+            log_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin, "总耗时:",
+                      datetime.datetime.now() - total_begin)
+            # 读取文件
+
+            begin = datetime.datetime.now()
+
+            read_and_save = ReadAndSaveTmp(self.executor_history_id, process_count, index,
+                                           exec_data,
+                                           self.out_put_dir if process_count == 1 else exec_data.path_param.get_process_tmp_path(),
+                                           self.save_db)
+
+            read_and_save.run()
+            log_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
+                      datetime.datetime.now() - total_begin)

+ 0 - 0
trans/__init__.py


+ 18 - 0
trans/common/Axis0DataImpl.py

@@ -0,0 +1,18 @@
+import multiprocessing
+
+from trans.ExecParam import ExecParam
+
+
+class Axis0DataImpl(object):
+
+    def __init__(self, id, process_count, now_count, exec_param: ExecParam, save_db=True):
+        self.id = id
+        self.process_count = process_count
+        self.now_count = now_count
+        self.exec_param = exec_param
+        self.save_db = save_db
+        self.lock_map = dict()
+        for i in range(1000):
+            self.lock_map[i] = multiprocessing.Manager().Lock()
+        self.lock = multiprocessing.Manager().Lock()
+        self.field_dict = multiprocessing.Manager().dict()

+ 139 - 0
trans/common/Axis1DataImpl.py

@@ -0,0 +1,139 @@
+import multiprocessing
+import os
+import traceback
+
+import pandas as pd
+
+from service.import_data_service import update_transfer_progress
+from trans.ExecParam import ExecParam
+from utils.file.trans_methods import split_array, read_excel_files, read_file_to_df, find_header
+from utils.log.import_data_log import log_print
+from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, get_dir_size, max_file_size_get_max_cpu_count
+
+
+class Axis1DataImpl(object):
+
+    def __init__(self, id, process_count, now_count, exec_param: ExecParam, save_db=True):
+        self.id = id
+        self.process_count = process_count
+        self.now_count = now_count
+        self.exec_param = exec_param
+        self.save_db = save_db
+        self.lock_map = dict()
+        for i in range(1000):
+            self.lock_map[i] = multiprocessing.Manager().Lock()
+        self.lock = multiprocessing.Manager().Lock()
+        self.field_dict = multiprocessing.Manager().dict()
+
+    def get_lock(self, split_col_value, col_name):
+        boolean_first_time = False
+        if split_col_value:
+            filed_name = f'{split_col_value}_{col_name}'
+        else:
+            filed_name = col_name
+
+        exists_count = len(self.field_dict.keys())
+        if exists_count >= 1000:
+            return self.lock
+
+        if filed_name not in self.field_dict:
+            boolean_first_time = True
+            self.field_dict[filed_name] = len(self.field_dict.keys()) + 1
+
+        return boolean_first_time, self.lock_map[self.field_dict[filed_name]]
+
+    def read_and_save_file(self, file_path):
+        if self.exec_param.has_header:
+            header = find_header(file_path, self.exec_param.use_cols)
+            if header is None:
+                raise Exception(f"文件{os.path.basename(file_path)}没有找到列名")
+        else:
+            header = None
+
+        df = read_file_to_df(file_path, header=header)
+        col_map = {'file_name': 'file_name'}
+        if 'sheet_name' in df.columns:
+            col_map['sheet_name'] = 'sheet_name'
+
+        for col in df.columns:
+            if col in self.exec_param.use_cols and col not in ['file_name', 'sheet_name']:
+                col_map[col] = self.exec_param.mapping_cols[col]
+
+        df = df.rename(columns=col_map)
+
+        # 如果数据不包含索引列报错
+        for col in self.exec_param.index_cols:
+            if col not in df.columns:
+                log_print(f"{file_path}没有索引列{col}")
+                raise Exception(f"{file_path}没有索引列{col}")
+
+        if self.exec_param.split_cols:
+            df['split_col'] = df[self.exec_param.split_cols].apply(
+                lambda x: '_'.join([str(i).replace(' ', '_').replace(':', '_') for i in x.values]),
+                axis=1)
+        else:
+            df['split_col'] = 'All'
+
+        split_col = df['split_col'].unique()
+
+        if len(split_col) >= 1000:
+            log_print(f"{file_path}切割文件太多,大于等于1000个")
+            raise Exception(f"{file_path}切割文件太多,大于等于1000个")
+
+        general_fields = list(df.columns)
+        general_fields.remove('split_col')
+        for col in self.exec_param.index_cols:
+            general_fields.remove(col)
+        for split_col_value in split_col:
+            for col in general_fields:
+                now_cols = self.exec_param.index_cols
+                now_cols.append(col)
+                now_df = df[df['split_col'] == split_col_value][now_cols]
+
+                boolean_first_time, lock = self.get_lock(split_col_value, col)
+                with lock:
+                    path = os.path.join(self.exec_param.path_param.get_merge_tmp_path(), split_col_value)
+                    os.makedirs(path, exist_ok=True)
+                    if boolean_first_time:
+                        now_df.to_csv(os.path.join(path, f'{col}.csv'), index=False, encoding='utf-8')
+                    else:
+                        now_df.to_csv(os.path.join(path, f'{col}.csv'), index=False, mode='a', header=False,
+                                      encoding='utf-8')
+
+    def read_merge_df_to_process(self, base_name):
+        all_files = os.listdir(os.path.join(self.exec_param.path_param.get_merge_tmp_path(), base_name))
+        dfs = [pd.read_csv(i, encoding='utf-8', index_col=self.exec_param.index_cols) for i in all_files]
+        df = pd.concat(dfs, axis=1, ignore_index=True)
+        df.reset_index(inplace=True)
+        df.to_csv(os.path.join(self.exec_param.path_param.get_process_tmp_path(), base_name + '.csv'), index=False,
+                  encoding='utf-8')
+
+    def run(self):
+
+        if len(self.exec_param.index_cols) == 0:
+            log_print("合并表需要闯将索引列")
+            log_print(traceback.format_exc())
+            raise Exception("合并表需要闯将索引列")
+
+        all_files = read_excel_files(self.exec_param.path_param.get_unzip_tmp_path())
+        split_count = use_files_get_max_cpu_count(all_files)
+        all_arrays = split_array(all_files, split_count)
+
+        log_print("开始读取文件,文件总数:", len(all_files), ",文件分片数:", split_count)
+
+        for index, now_array in enumerate(all_arrays):
+            with multiprocessing.Pool(split_count) as pool:
+                pool.starmap(self.read_and_save_file, now_array)
+
+            update_transfer_progress(self.id, round(20 + 50 * (index + 1) / len(all_arrays)), self.process_count,
+                                     self.now_count, self.save_db)
+
+        all_dirs = os.listdir(self.exec_param.path_param.get_merge_tmp_path())
+
+        dir_size = get_dir_size(os.path.join(self.exec_param.path_param.get_merge_tmp_path(), all_dirs[0]))
+        pool_count = max_file_size_get_max_cpu_count(dir_size)
+        pool_count = pool_count if pool_count <= len(all_files) else len(all_files)
+        with multiprocessing.Pool(pool_count) as pool:
+            pool.map(self.read_merge_df_to_process, all_dirs)
+
+        update_transfer_progress(self.id, 80, self.process_count, self.now_count, self.save_db)

+ 27 - 0
trans/common/PathParam.py

@@ -0,0 +1,27 @@
+import os
+
+
+class PathParam(object):
+    def __init__(self, tmp_root, exector_process_dir_name, read_dir):
+        self.tmp_base_path = str(os.path.join(tmp_root, exector_process_dir_name))
+        self.read_dir = read_dir
+
+    def get_unzip_tmp_path(self):
+        path = os.path.join(self.tmp_base_path, 'unzip_tmp')
+        if not os.path.exists(path):
+            os.makedirs(path, exist_ok=True)
+
+        return path
+
+    def get_process_tmp_path(self):
+        path = os.path.join(self.tmp_base_path, 'process_tmp')
+        if not os.path.exists(path):
+            os.makedirs(path, exist_ok=True)
+
+        return path
+
+    def get_merge_tmp_path(self):
+        path = os.path.join(self.tmp_base_path, 'merge_tmp')
+        if not os.path.exists(path):
+            os.makedirs(path, exist_ok=True)
+        return path

+ 23 - 0
trans/common/ReadAndSaveTmp.py

@@ -0,0 +1,23 @@
+from trans.ExecParam import ExecParam
+from trans.common.Axis0DataImpl import Axis0DataImpl
+from trans.common.Axis1DataImpl import Axis1DataImpl
+
+
+class ReadAndSaveTmp(object):
+
+    def __init__(self, id, process_count, now_count, exec_param: ExecParam, save_dir, save_db=True):
+        self.id = id
+        self.process_count = process_count
+        self.now_count = now_count
+        self.exec_param = exec_param
+        self.save_dir = save_dir
+        self.save_db = save_db
+
+    def run(self):
+
+        if self.exec_param.join_type == 0:
+            run_process = Axis0DataImpl(self.id, self.process_count, self.now_count, self.exec_param, self.save_db)
+        else:
+            run_process = Axis1DataImpl(self.id, self.process_count, self.now_count, self.exec_param, self.save_db)
+
+        run_process.run()

+ 78 - 0
trans/common/UnzipAndRemove.py

@@ -0,0 +1,78 @@
+import multiprocessing
+import os
+import traceback
+
+from service.import_data_service import update_transfer_progress
+from trans.common.PathParam import PathParam
+from utils.file.trans_methods import copy_to_new, read_files, split_array, read_excel_files
+from utils.log.import_data_log import log_print
+from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
+from utils.zip.unzip import unzip, unrar, get_desc_path
+
+
+class UnzipAndRemove(object):
+    def __init__(self, id, process_count, now_count, path_param: PathParam, save_db=True):
+        self.id = id
+        self.process_count = process_count
+        self.now_count = now_count
+        self.path_param = path_param
+        self.save_db = save_db
+
+    def get_and_remove(self, file):
+
+        to_path = self.path_param.get_unzip_tmp_path()
+        if str(file).endswith("zip"):
+            if str(file).endswith("csv.zip"):
+                copy_to_new(file, file.replace(self.path_param.read_dir, to_path).replace("csv.zip", 'csv.gz'))
+            else:
+                desc_path = file.replace(self.path_param.read_dir, to_path)
+                is_success, e = unzip(file, get_desc_path(desc_path))
+                if not is_success:
+                    # raise e
+                    pass
+        elif str(file).endswith("rar"):
+            desc_path = file.replace(self.path_param.read_dir, to_path)
+            is_success, e = unrar(file, get_desc_path(desc_path))
+            if not is_success:
+                log_print(traceback.format_exc())
+                pass
+        else:
+            copy_to_new(file, file.replace(self.path_param.read_dir, to_path))
+
+    def remove_file_to_tmp_path(self):
+        # 读取文件
+        try:
+            if os.path.isfile(self.path_param.read_dir):
+                all_files = [self.path_param.read_dir]
+            else:
+                all_files = read_files(self.path_param.read_dir)
+
+            # 最大取系统cpu的 三分之二
+            split_count = get_available_cpu_count_with_percent(1 / 2)
+            all_arrays = split_array(all_files, split_count)
+
+            pool_count = split_count if split_count < len(all_files) else len(all_files)
+            for index, arr in enumerate(all_arrays):
+                with multiprocessing.Pool(pool_count) as pool:
+                    pool.starmap(self.get_and_remove, [(i,) for i in arr])
+                update_transfer_progress(self.id, round(5 + 15 * (index + 1) / len(all_arrays)), self.process_count,
+                                         self.now_count, self.save_db)
+
+                all_files = read_excel_files(self.path_param.get_unzip_tmp_path())
+
+                log_print('读取文件数量:', len(all_files))
+        except Exception as e:
+            log_print(traceback.format_exc())
+            message = "读取文件列表错误:" + self.path_param.read_dir + ",系统返回错误:" + str(e)
+            raise ValueError(message)
+        return all_files
+
+    def run(self):
+        self.remove_file_to_tmp_path()
+        update_transfer_progress(self.id, round(20), self.process_count, self.now_count, self.save_db)
+
+
+if __name__ == '__main__':
+    path_param = PathParam(r'd://data//test_import', 'ceshi001', r'd://data//data.zip')
+    tt = UnzipAndRemove(1, 2, 1, path_param, save_db=False)
+    tt.run()

+ 0 - 0
trans/common/__init__.py


+ 0 - 0
utils/__init__.py


+ 22 - 0
utils/common_util.py

@@ -0,0 +1,22 @@
+import os.path
+
+
+def get_project_root():
+    path = os.path.dirname(os.path.dirname(__file__)) + os.sep
+    return path
+
+
+def get_project_conf_path():
+    path = get_project_root() + 'conf' + os.sep
+    return path
+
+
+def get_project_conf_path_file(env='dev'):
+    path = get_project_root() + 'conf' + os.sep + f'conf_{env}.yaml'
+    return path
+
+
+
+if __name__ == '__main__':
+    print(get_project_root())
+    print(get_project_conf_path())

+ 3 - 0
utils/conf/__init__.py

@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/7
+# @Author  : 魏志亮

+ 22 - 0
utils/conf/read_conf.py

@@ -0,0 +1,22 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/7
+# @Author  : 魏志亮
+
+import yaml
+
+
+def yaml_conf(path, encoding='utf-8'):
+    with open(path, 'r', encoding=encoding) as f:
+        data = yaml.safe_load(f)
+    return data
+
+
+def read_conf(dict_conf, col, default_value=None):
+    if col in dict_conf:
+        res = dict_conf[col]
+        if res is None and default_value is not None:
+            return default_value
+        return res
+    else:
+        return default_value
+

+ 60 - 0
utils/db/ConnectMysql.py

@@ -0,0 +1,60 @@
+import traceback
+from os import *
+
+import traceback
+from os import *
+
+import pandas as pd
+import pymysql
+from pymysql.cursors import DictCursor
+from sqlalchemy import create_engine
+
+from utils.common_util import get_project_conf_path_file
+from utils.conf.read_conf import yaml_conf
+from utils.log.import_data_log import log_print
+
+
+class ConnectMysql:
+
+    def __init__(self, connet_name):
+        conf_path = get_project_conf_path_file('dev')
+        self.yaml_data = yaml_conf(environ.get('IMPORT_CONF', conf_path))
+        self.connet_name = connet_name
+        self.config = self.yaml_data[self.connet_name]
+
+    # 从连接池中获取一个连接
+    def get_conn(self):
+        return pymysql.connect(**self.config)
+
+    # 使用连接执行sql
+    def execute(self, sql, params=tuple()):
+
+        with self.get_conn() as conn:
+            with conn.cursor(cursor=DictCursor) as cursor:
+                try:
+                    cursor.execute(sql, params)
+                    log_print("开始执行SQL:", cursor._executed)
+                    conn.commit()
+                    result = cursor.fetchall()
+                    return result
+                except Exception as e:
+                    log_print(f"执行sql:{sql},报错:{e}")
+                    log_print(traceback.format_exc())
+                    conn.rollback()
+                    raise e
+
+    def get_engine(self):
+        config = self.config
+        username = config['user']
+        password = config['password']
+        host = config['host']
+        port = config['port']
+        dbname = config['database']
+        return create_engine(f'mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}')
+
+    def execute_df_save(self, df, table_name):
+        df.to_sql(table_name, self.get_engine(), index=False, if_exists='append')
+
+    def read_sql_to_df(self, sql):
+        df = pd.read_sql_query(sql, self.get_engine())
+        return df

+ 3 - 0
utils/db/__init__.py

@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/7
+# @Author  : 魏志亮

+ 3 - 0
utils/file/__init__.py

@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/6/11
+# @Author  : 魏志亮

+ 196 - 0
utils/file/trans_methods.py

@@ -0,0 +1,196 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/5/16
+# @Author  : 魏志亮
+import ast
+import datetime
+import os
+import shutil
+import warnings
+
+import chardet
+import pandas as pd
+
+from utils.log.import_data_log import log_print
+
+warnings.filterwarnings("ignore")
+
+
+# 获取文件编码
+def detect_file_encoding(filename):
+    # 读取文件的前1000个字节(足够用于大多数编码检测)
+    with open(filename, 'rb') as f:
+        rawdata = f.read(1000)
+    result = chardet.detect(rawdata)
+    encoding = result['encoding']
+
+    log_print("文件类型:", filename, encoding)
+
+    if encoding is None:
+        encoding = 'gb18030'
+
+    if encoding.lower() in ['utf-8', 'ascii', 'utf8', 'utf-8-sig']:
+        return 'utf-8'
+
+    return 'gb18030'
+
+
+def del_blank(df=pd.DataFrame(), cols=list()):
+    for col in cols:
+        if df[col].dtype == object:
+            df[col] = df[col].str.strip()
+    return df
+
+
+# 切割数组到多个数组
+def split_array(array, num):
+    return [array[i:i + num] for i in range(0, len(array), num)]
+
+
+def find_header(file_path, use_cols=list()):
+    df = read_file_to_df(file_path, None, None, 50)
+    count = 0
+    header = None
+    for index, row in df.iterrows():
+        values = row.values
+        for col in use_cols:
+            if col in values:
+                count = count + 1
+                if count > 2:
+                    header = index + 1
+                    break
+
+    return header
+
+
+# 读取数据到df
+def read_file_to_df(file_path, use_cols=list(), header=None, nrows=None):
+    begin = datetime.datetime.now()
+    log_print('开始读取文件', file_path)
+    base_name = os.path.basename(file_path)
+    df = pd.DataFrame()
+    try:
+        if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
+            encoding = detect_file_encoding(file_path)
+            end_with_gz = str(file_path).lower().endswith("gz")
+            if end_with_gz:
+                df = pd.read_csv(file_path, encoding=encoding, usecols=use_cols, compression='gzip',
+                                 header=header, nrows=nrows)
+            else:
+                df = pd.read_csv(file_path, encoding=encoding, usecols=use_cols, header=header,
+                                 on_bad_lines='warn', nrows=nrows)
+
+        else:
+            xls = pd.ExcelFile(file_path)
+            # 获取所有的sheet名称
+            sheet_names = xls.sheet_names
+            for sheet_name in sheet_names:
+                if use_cols:
+                    now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, usecols=use_cols, nrows=nrows)
+                else:
+                    now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, nrows=nrows)
+
+                now_df['sheet_name'] = sheet_name
+                df = pd.concat([df, now_df])
+            xls.close()
+        df['file_name'] = base_name[:str(base_name).rfind(".")]
+        log_print('文件读取成功:', file_path, '数据数量:', df.shape, '耗时:', datetime.datetime.now() - begin)
+    except Exception as e:
+        log_print('读取文件出错', file_path, str(e))
+        message = '文件:' + os.path.basename(file_path) + ',' + str(e)
+        raise ValueError(message)
+
+    return df
+
+
+def __build_directory_dict(directory_dict, path, filter_types=None):
+    # 遍历目录下的所有项
+    for item in os.listdir(path):
+        item_path = os.path.join(path, item)
+        if os.path.isdir(item_path):
+            __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
+        elif os.path.isfile(item_path):
+            if path not in directory_dict:
+                directory_dict[path] = []
+
+            if filter_types is None or len(filter_types) == 0:
+                directory_dict[path].append(item_path)
+            elif str(item_path).split(".")[-1] in filter_types:
+                if str(item_path).count("~$") == 0:
+                    directory_dict[path].append(item_path)
+
+
+# 读取路径下所有的excel文件
+def read_excel_files(read_path, filter_types=None):
+    if filter_types is None:
+        filter_types = ['xls', 'xlsx', 'csv', 'gz']
+    if os.path.isfile(read_path):
+        return [read_path]
+
+    directory_dict = {}
+    __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
+
+    return [path for paths in directory_dict.values() for path in paths if path]
+
+
+# 读取路径下所有的文件
+def read_files(read_path, filter_types=None):
+    if filter_types is None:
+        filter_types = ['xls', 'xlsx', 'csv', 'gz', 'zip', 'rar']
+    if os.path.isfile(read_path):
+        return [read_path]
+    directory_dict = {}
+    __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
+
+    return [path1 for paths in directory_dict.values() for path1 in paths if path1]
+
+
+def copy_to_new(from_path, to_path):
+    is_file = False
+    if to_path.count('.') > 0:
+        is_file = True
+
+    create_file_path(to_path, is_file_path=is_file)
+
+    shutil.copy(from_path, to_path)
+
+
+# 创建路径
+def create_file_path(read_path, is_file_path=False):
+    """
+    创建路径
+    :param read_path:创建文件夹的路径
+    :param is_file_path: 传入的path是否包含具体的文件名
+    """
+    if is_file_path:
+        read_path = os.path.dirname(read_path)
+
+    if not os.path.exists(read_path):
+        os.makedirs(read_path, exist_ok=True)
+
+
+def valid_eval(eval_str):
+    """
+    验证 eval 是否包含非法的参数
+    """
+    safe_param = ["column", "wind_name", "df", "error_time", "str", "int"]
+    eval_str_names = [node.id for node in ast.walk(ast.parse(eval_str)) if isinstance(node, ast.Name)]
+    if not set(eval_str_names).issubset(safe_param):
+        raise NameError(
+            eval_str + " contains unsafe name :" + str(','.join(list(set(eval_str_names) - set(safe_param)))))
+    return True
+
+
+if __name__ == '__main__':
+    # aa = valid_eval("column[column.find('_')+1:]")
+    # print(aa)
+    #
+    # aa = valid_eval("df['123'].apply(lambda wind_name: wind_name.replace('元宝山','').replace('号风机',''))")
+    # print(aa)
+    #
+    # aa = valid_eval("'记录时间' if column == '时间' else column;from os import *; path")
+    # print(aa)
+
+    df = read_file_to_df(r"D:\data\11-12月.xls",
+                         trans_cols=['风机', '时间', '有功功率', '无功功率', '功率因数', '频率'], nrows=30)
+
+    print(df.columns)

+ 3 - 0
utils/log/__init__.py

@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/5/16
+# @Author  : 魏志亮

+ 40 - 0
utils/log/import_data_log.py

@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/5/16
+# @Author  : 魏志亮
+
+import datetime
+import logging
+import sys
+from os import *
+
+from utils.common_util import get_project_conf_path_file
+from utils.conf.read_conf import read_conf, yaml_conf
+
+logger = logging.getLogger("ImportData_Python")
+logger.setLevel(logging.INFO)
+stout_handle = logging.StreamHandler(sys.stdout)
+stout_handle.setFormatter(
+    logging.Formatter("%(asctime)s: %(message)s"))
+stout_handle.setLevel(logging.INFO)
+logger.addHandler(stout_handle)
+
+conf_path = get_project_conf_path_file('dev')
+config = yaml_conf(environ.get('IMPORT_CONF', conf_path))
+log_path_dir = read_conf(config, 'log_path_dir', "/data/logs")
+
+log_path = log_path_dir + sep + r'ImportData_Python_' + (environ['env'] if 'env' in environ else 'dev')
+file_path = path.join(log_path)
+
+if not path.exists(file_path):
+    makedirs(file_path, exist_ok=True)
+file_name = file_path + sep + str(datetime.date.today()) + '.log'
+
+file_handler = logging.FileHandler(file_name, encoding='utf-8')
+file_handler.setFormatter(
+    logging.Formatter("%(asctime)s: %(message)s"))
+file_handler.setLevel(logging.INFO)
+logger.addHandler(file_handler)
+
+
+def log_print(*args):
+    logger.info("  ".join([str(a) for a in args]))

+ 0 - 0
utils/systeminfo/__init__.py


+ 110 - 0
utils/systeminfo/sysinfo.py

@@ -0,0 +1,110 @@
+from os import *
+
+import psutil
+
+from utils.log.import_data_log import log_print
+
+
+def print_memory_usage(detail=""):
+    # 获取当前进程ID
+    pid = getpid()
+    # 获取进程信息
+    py = psutil.Process(pid)
+    # 获取内存信息
+    memory_info = py.memory_info()
+    # RSS (Resident Set Size) 是进程实际占用的物理内存大小
+    memory_usage_rss = memory_info.rss
+    # VMS (Virtual Memory Size) 是进程使用的虚拟内存大小
+    memory_usage_vms = memory_info.vms
+
+    # 将字节转换为更易读的单位
+    memory_usage_rss_mb = memory_usage_rss / (1024 ** 2)
+    memory_usage_vms_mb = memory_usage_vms / (1024 ** 2)
+
+    log_print(f"{detail},Memory usage (RSS): {memory_usage_rss_mb:.2f} MB")
+    log_print(f"{detail},Memory usage (VMS): {memory_usage_vms_mb:.2f} MB")
+
+
+def get_cpu_count():
+    return psutil.cpu_count()
+
+
+def get_available_cpu_count_with_percent(percent: float = 1):
+    cpu_count = get_cpu_count()
+    return int(cpu_count * percent)
+
+
+def get_file_size(file_path):
+    return path.getsize(file_path)
+
+
+def get_dir_size(dir_path):
+    return sum(get_file_size(path.join(dir_path, file)) for file in listdir(dir_path) if
+               path.isfile(path.join(dir_path, file)))
+
+
+def get_available_memory_with_percent(percent: float = 1):
+    memory_info = psutil.virtual_memory()
+    return int(memory_info.available * percent)
+
+
+def get_max_file_size(file_paths: list[str]):
+    max_size = 0
+    for file_path in file_paths:
+        file_size = get_file_size(file_path)
+        if file_size > max_size:
+            max_size = file_size
+    return max_size
+
+
+def use_files_get_max_cpu_count(file_paths: list[str], memory_percent: float = 1 / 12, cpu_percent: float = 2 / 5):
+    max_file_size = get_max_file_size(file_paths)
+    free_memory = get_available_memory_with_percent(memory_percent)
+    count = int(free_memory / max_file_size)
+    max_cpu_count = get_available_cpu_count_with_percent(cpu_percent)
+    result = count if count <= max_cpu_count else max_cpu_count
+    if result == 0:
+        result = 1
+
+    if result > len(file_paths):
+        result = len(file_paths)
+
+    log_print("总文件数:", len(file_paths), ",获取最大文件大小:", str(round(max_file_size / 2 ** 20, 2)) + "M",
+                "可用内存:", str(get_available_memory_with_percent(1) / 2 ** 20) + "M",
+                "总CPU数:", get_cpu_count(), "CPU使用比例:", round(cpu_percent, 2), "CPU可用数量:", max_cpu_count,
+                ",最终确定使用进程数:", result)
+    return result
+
+
+def max_file_size_get_max_cpu_count(max_file_size, memory_percent: float = 1 / 6, cpu_percent: float = 2 / 5):
+    free_memory = get_available_memory_with_percent(memory_percent)
+    count = int(free_memory / max_file_size)
+    max_cpu_count = get_available_cpu_count_with_percent(cpu_percent)
+    result = count if count <= max_cpu_count else max_cpu_count
+    if result == 0:
+        result = 1
+    log_print(",获取最大文件大小:", str(round(max_file_size / 2 ** 20, 2)) + "M",
+                "可用内存:", str(get_available_memory_with_percent(1) / 2 ** 20) + "M",
+                "总CPU数:", get_cpu_count(), "CPU使用比例:", round(cpu_percent, 2), "CPU可用数量:", max_cpu_count,
+                ",最终确定使用进程数:", result)
+    return result
+
+
+if __name__ == '__main__':
+    from utils.file.trans_methods import read_files
+    import datetime
+
+    read_path = r"Z:\collection_data\1进行中\密马风电场-山西-大唐\收资数据\scada\秒级数据"
+    begin = datetime.datetime.now()
+    all_files = read_files(read_path)
+    print(datetime.datetime.now() - begin)
+
+    print(use_files_get_max_cpu_count(all_files))
+
+    print(get_available_memory_with_percent(1) / 2 ** 20)
+    print(get_available_memory_with_percent(2 / 3) / 2 ** 20)
+
+    begin = datetime.datetime.now()
+    print(len(all_files))
+    print(get_max_file_size(all_files) / 2 ** 20)
+    print(datetime.datetime.now() - begin)

+ 3 - 0
utils/zip/__init__.py

@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/5/17
+# @Author  : 魏志亮

+ 122 - 0
utils/zip/unzip.py

@@ -0,0 +1,122 @@
+# -*- coding: utf-8 -*-
+# @Time    : 2024/5/17
+# @Author  : 魏志亮
+import traceback
+import zipfile
+import os
+import rarfile
+
+from utils.file.trans_methods import detect_file_encoding
+from utils.log.import_data_log import log_print, logger
+
+
+def __support_gbk(zip_file: zipfile.ZipFile):
+    name_to_info = zip_file.NameToInfo
+    # copy map first
+    for name, info in name_to_info.copy().items():
+        real_name = name.encode('cp437').decode('gbk')
+        if real_name != name:
+            info.filename = real_name
+            del name_to_info[name]
+            name_to_info[real_name] = info
+    return zip_file
+
+
+def unzip(zip_filepath, dest_path):
+    # 解压zip文件
+    is_success = True
+    log_print('开始读取文件:', zip_filepath)
+    log_print("解压到:", dest_path)
+
+    try:
+        if detect_file_encoding(zip_filepath).startswith("gb"):
+            try:
+                with __support_gbk(zipfile.ZipFile(zip_filepath, 'r')) as zip_ref:
+                    zip_ref.extractall(dest_path)
+            except:
+                with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
+                    zip_ref.extractall(dest_path)
+        else:
+            with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
+                zip_ref.extractall(dest_path)
+
+    except zipfile.BadZipFile as e:
+        log_print(traceback.format_exc())
+        is_success = False
+        log_print('不是zip文件:', zip_filepath)
+        return is_success, e
+
+    # 遍历解压后的文件
+    dest_path = dest_path
+    log_print('解压再次读取', dest_path)
+    if is_success:
+        for root, dirs, files in os.walk(dest_path):
+            for file in files:
+                file_path = os.path.join(root, file)
+                # 检查文件是否是zip文件
+                if file_path.endswith('.zip'):
+                    if file_path.endswith('.csv.zip'):
+                        os.rename(file_path, file_path.replace(".csv.zip", ".csv.gz"))
+                    else:
+                        # 如果是,递归解压
+                        unzip(file_path, dest_path + os.sep + get_desc_path(str(file)))
+                        # 删除已解压的zip文件(可选)
+                        os.remove(file_path)
+                    # 检查文件是否是zip文件
+                if file_path.endswith('.rar'):
+                    # 如果是,递归解压
+                    unrar(file_path, dest_path + os.sep + get_desc_path(str(file)))
+                    # 删除已解压的zip文件(可选)
+                    os.remove(file_path)
+
+    return is_success, ''
+
+
+def unrar(rar_file_path, dest_dir):
+    # 检查目标目录是否存在,如果不存在则创建
+    # 解压zip文件
+    is_success = True
+    log_print('开始读取文件:', rar_file_path)
+    dest_path = dest_dir
+    log_print("解压到:", dest_path)
+    if not os.path.exists(dest_path):
+        os.makedirs(dest_path)
+
+    try:
+        # 打开RAR文件
+        with rarfile.RarFile(rar_file_path) as rf:
+            # 循环遍历RAR文件中的所有成员(文件和目录)
+            for member in rf.infolist():
+                # 解压文件到目标目录
+                rf.extract(member, dest_path)
+    except Exception as e:
+        log_print(traceback.format_exc())
+        logger.exception(e)
+        is_success = False
+        log_print('不是rar文件:', rar_file_path)
+        return is_success, e
+
+    # 遍历解压后的文件
+    print('解压再次读取', dest_path)
+    if is_success:
+        for root, dirs, files in os.walk(dest_path):
+            for file in files:
+                file_path = os.path.join(root, file)
+                # 检查文件是否是zip文件
+                if file_path.endswith('.rar'):
+                    # 如果是,递归解压
+                    unrar(file_path, get_desc_path(file_path))
+                    # 删除已解压的zip文件(可选)
+                    os.remove(file_path)
+
+                if file_path.endswith('.zip'):
+                    # 如果是,递归解压
+                    unzip(file_path, get_desc_path(file_path))
+                    # 删除已解压的zip文件(可选)
+                    os.remove(file_path)
+
+    return is_success, ''
+
+
+def get_desc_path(path):
+    return path[0:path.rfind(".")]