Ver código fonte

第一版,添加 axis 0和1 的实现

wzl 5 meses atrás
pai
commit
ebd4ec4678

+ 0 - 2
db_server/__init__.py

@@ -1,2 +0,0 @@
-import  pandas as pd
-pd.__version__

+ 3 - 3
import_exec.py

@@ -20,10 +20,10 @@ def run_data(env_conf):
 
         except Exception as e:
             log_print(traceback.format_exc())
-            run_fail(run_data['id'], e)
+            run_fail(run_data['id'], str(e))
             log_print("开始删除临时文件夹")
-            if import_param and os.path.exists(import_param.tmp_root):
-                shutil.rmtree(import_param.tmp_root)
+            # if import_param and os.path.exists(import_param.tmp_root):
+            #     shutil.rmtree(import_param.tmp_root)
             log_print("删除临时文件夹删除成功")
 
 

+ 6 - 6
service/import_data_service.py

@@ -5,11 +5,11 @@ db = import_db
 
 
 def get_exec_data(run_count=1):
-    run_count_datas = db.execute('select count(1) as count from executor_history where exec_status = 0')
+    run_count_datas = db.execute('select count(1) as count from executor_history where exec_status = 0 and status = 1')
     log_print(run_count_datas)
     running_count = int(run_count_datas[0]['count'])
     if running_count < run_count:
-        run_datas = db.execute('select * from executor_history where exec_status = -1 limit 1')
+        run_datas = db.execute('select * from executor_history where exec_status = -1  and status = 1 limit 1')
         log_print(run_datas)
         if run_datas:
             return run_datas[0]
@@ -21,14 +21,14 @@ def get_exec_data(run_count=1):
     return None
 
 
-def begin_run(id, save_db=True):
+def run_begin(id, save_db=True):
     if save_db:
         db.execute(
-            'update executor_history set exec_status = 0,begin_time=now() where id = %s', (id,))
+            'update executor_history set exec_status = 0,begin_time=now(),err_info = "" where id = %s', (id,))
 
 
 def update_transfer_progress(id, number, process_count, now_count, save_db=True):
-    number = round((now_count - 1) * 100 / process_count + number)
+    number = round(now_count * 100 / process_count + number)
     log_print(f"{process_count}个任务,当前进度{now_count}, 当前比例:{number}")
     if save_db:
         db.execute('update executor_history set transfer_progress = %s where id = %s', (number, id))
@@ -36,7 +36,7 @@ def update_transfer_progress(id, number, process_count, now_count, save_db=True)
 
 def run_success(id, save_db=True):
     if save_db:
-        db.execute('update executor_history set exec_status = 1,end_time=now()  where id = %s', (id,))
+        db.execute('update executor_history set exec_status = 1,end_time=now(),transfer_progress = 100  where id = %s', (id,))
 
 
 def run_fail(id, error, save_db=True):

+ 2 - 2
trans/ExecParam.py

@@ -3,14 +3,14 @@ from trans.common.PathParam import PathParam
 
 
 class ExecParam(object):
-    def __init__(self, data, tmp_root, read_dir):
+    def __init__(self, data, tmp_root):
 
         self.process_executor_id = data['id']
         self.process_executor = data['name']
         self.read_path = data['readPath']
         # 0:竖向 1:横向
         self.join_type = data['joinType']
-        self.path_param = PathParam(tmp_root, f'{self.process_executor_id}_{self.process_executor}', read_dir)
+        self.path_param = PathParam(tmp_root, f'{self.process_executor_id}_{self.process_executor}', self.read_path)
 
         self.mapping_list = list()
         results = get_mapping_field(self.process_executor_id)

+ 99 - 14
trans/ImportParam.py

@@ -1,12 +1,17 @@
-import datetime
+import shutil
+from datetime import datetime
 import json
+import multiprocessing
 import os.path
 
+import pandas as pd
+
+from service.import_data_service import get_exec_group, run_success, run_begin
 from trans.ExecParam import ExecParam
-from service.import_data_service import get_exec_group, get_exec_data
 from trans.common.ReadAndSaveTmp import ReadAndSaveTmp
 from trans.common.UnzipAndRemove import UnzipAndRemove
 from utils.conf.read_conf import read_conf
+from utils.file.trans_methods import read_excel_files
 from utils.log.import_data_log import log_print
 
 
@@ -23,40 +28,120 @@ class ImportParam(object):
             result = results[0]
             self.process_group_name = result['name']
             self.join_type = result['join_type']
+            self.join_type_strs = [int(i) for i in result['join_type_str'].split(",")] if result[
+                'join_type_str'] else []
             self.process_group_id = data['process_group_id']
             self.tmp_root = os.path.join(self.tmp_root, f'{self.executor_history_id}_{self.executor_history_name}')
             self.out_put_dir = data['out_put_dir']
             self.execotor_list = list()
             for exec_data in json.loads(data['input_dirs']):
-                self.execotor_list.append(ExecParam(exec_data, self.tmp_root))
+                self.execotor_list.append(ExecParam(exec_data, self.tmp_root, ))
         else:
             log_print(f"通过ID: {data['id']}没有获取到执行器组")
             raise Exception(f"通过ID: {data['id']}没有获取到执行器组")
 
+    def save_result_dir(self, base_name, file_dicts, index_cols, axis=0, join_type_strs=[]):
+        log_print(f"开始执行执行器合并:{base_name}")
+        begin = datetime.now()
+
+        df = pd.DataFrame()
+        if axis in [0, 1]:
+            if index_cols:
+                dfs = [pd.read_csv(file, encoding='utf-8', index_col=index_cols) for index, file in file_dicts.items()]
+            else:
+                dfs = [pd.read_csv(file, encoding='utf-8') for index, file in file_dicts.items()]
+
+            df = pd.concat(dfs, axis=axis)
+        else:
+            if len(set(self.execotor_list)) == 1:
+                if index_cols:
+                    dfs = [pd.read_csv(file, encoding='utf-8', index_col=index_cols) for index, file in
+                           file_dicts.items()]
+                else:
+                    dfs = [pd.read_csv(file, encoding='utf-8') for index, file in file_dicts.items()]
+
+                df = pd.concat(dfs, axis=axis)
+            else:
+                if index_cols:
+                    for join_type_index, join_type in enumerate(join_type_strs):
+                        df1 = pd.DataFrame()
+                        df2 = pd.DataFrame()
+                        if join_type_index in file_dicts.keys():
+                            df1 = pd.read_csv(file_dicts[join_type_index], encoding='utf-8', index_col=index_cols)
+
+                        if join_type_index + 1 in file_dicts.keys():
+                            df2 = pd.read_csv(file_dicts[join_type_index + 1], encoding='utf-8', index_col=index_cols)
+                        df = pd.concat([df, df1, df2], axis=self.execotor_list[join_type_index])
+                else:
+                    for join_type_index, join_type in enumerate(join_type_strs):
+                        df1 = pd.DataFrame()
+                        df2 = pd.DataFrame()
+                        if join_type_index in file_dicts.keys():
+                            df1 = pd.read_csv(file_dicts[join_type_index], encoding='utf-8')
+                        if join_type_index + 1 in file_dicts.keys():
+                            df2 = pd.read_csv(file_dicts[join_type_index + 1], encoding='utf-8')
+                        df = pd.concat([df, df1, df2], axis=self.execotor_list[join_type_index])
+
+        if index_cols:
+            df.reset_index(inplace=True)
+            df.drop_duplicates(subset=index_cols, inplace=True)
+            df.sort_values(by=index_cols, inplace=True)
+
+        df.to_csv(os.path.join(self.out_put_dir, f'{base_name}'), encoding='utf-8', index=False)
+        log_print(f"{base_name}合并结束:{base_name},耗时:", datetime.now() - begin)
+
     def run(self):
-        total_begin = datetime.datetime.now()
+        total_begin = datetime.now()
         log_print(f'开始执行{self.process_group_name}')
+        log_print(f'开始清理临时目录{self.tmp_root}')
+        shutil.rmtree(self.tmp_root, ignore_errors=True)
+        log_print(f'清理临时目录{self.tmp_root}成功')
+
+        run_begin(self.executor_history_id, self.save_db)
 
         process_count = len(self.execotor_list)
         for index, exec_data in enumerate(self.execotor_list):
             log_print(f'开始执行{exec_data.process_executor}')
 
-            begin = datetime.datetime.now()
+            begin = datetime.now()
             # 移动解压文件
             unzip_and_remove = UnzipAndRemove(self.executor_history_id, process_count, index,
                                               exec_data.path_param, self.save_db)
             unzip_and_remove.run()
-            log_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin, "总耗时:",
-                      datetime.datetime.now() - total_begin)
+            log_print("解压移动文件结束:耗时:", datetime.now() - begin, "总耗时:", datetime.now() - total_begin)
             # 读取文件
 
-            begin = datetime.datetime.now()
+            begin = datetime.now()
 
-            read_and_save = ReadAndSaveTmp(self.executor_history_id, process_count, index,
-                                           exec_data,
-                                           self.out_put_dir if process_count == 1 else exec_data.path_param.get_process_tmp_path(),
-                                           self.save_db)
+            read_and_save = ReadAndSaveTmp(self.executor_history_id, process_count, index, exec_data, self.save_db)
 
             read_and_save.run()
-            log_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
-                      datetime.datetime.now() - total_begin)
+            log_print(f"{exec_data.process_executor}保存数据到临时文件结束,耗时:", datetime.now() - begin,
+                      "总耗时:", datetime.now() - total_begin)
+
+        log_print(f"{process_count}个执行器全部保存数据到临时文件结束,总耗时", datetime.now() - total_begin)
+
+        log_print(f"开始执行执行器合并")
+        begin = datetime.now()
+
+        if self.join_type in [0, 1]:
+            self.join_type_strs = [self.join_type] * len(self.execotor_list)
+
+        file_map = dict()
+        for index, exec_data in enumerate(self.execotor_list):
+            all_files = read_excel_files(exec_data.path_param.get_process_tmp_path())
+            for file in all_files:
+                base_name = os.path.basename(file)
+                if base_name in file_map:
+                    file_map[base_name][index] = file
+                else:
+                    file_map[base_name] = {index: file}
+
+        with multiprocessing.Pool(6) as pool:
+            pool.starmap(self.save_result_dir,
+                         [(base_name, file_dicts, self.execotor_list[0].index_cols, self.join_type,
+                           self.join_type_strs) for base_name, file_dicts in file_map.items()])
+
+        log_print("合并结束,耗时:", datetime.now() - begin, ",总耗时:", datetime.now() - total_begin)
+        run_success(self.executor_history_id)
+        log_print(f"{process_count}个执行器全部合并结束,总耗时", datetime.now() - total_begin)

+ 89 - 3
trans/common/Axis0DataImpl.py

@@ -1,6 +1,10 @@
 import multiprocessing
+import os
 
 from trans.ExecParam import ExecParam
+from utils.file.trans_methods import read_excel_files, split_array, find_header, read_file_to_df
+from utils.log.import_data_log import log_print
+from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
 
 
 class Axis0DataImpl(object):
@@ -11,8 +15,90 @@ class Axis0DataImpl(object):
         self.now_count = now_count
         self.exec_param = exec_param
         self.save_db = save_db
-        self.lock_map = dict()
-        for i in range(1000):
-            self.lock_map[i] = multiprocessing.Manager().Lock()
+        # self.lock_map = dict()
+        # for i in range(1000):
+        #     self.lock_map[i] = multiprocessing.Manager().Lock()
         self.lock = multiprocessing.Manager().Lock()
         self.field_dict = multiprocessing.Manager().dict()
+
+    def get_lock(self, split_col_value, col_name):
+        boolean_first_time = False
+        if split_col_value:
+            filed_name = f'{split_col_value}_{col_name}'
+        else:
+            filed_name = col_name
+
+        exists_count = len(self.field_dict.keys())
+        # if exists_count >= 4:
+        #     return self.lock
+
+        if filed_name not in self.field_dict:
+            boolean_first_time = True
+            self.field_dict[filed_name] = len(self.field_dict.keys()) + 1
+
+        # return boolean_first_time, self.lock_map[self.field_dict[filed_name]]
+        return boolean_first_time, self.lock
+
+    def read_and_save_file(self, file_path):
+        if self.exec_param.has_header:
+            header = find_header(file_path, self.exec_param.use_cols)
+            if header is None:
+                raise Exception(f"文件{os.path.basename(file_path)}没有找到列名")
+        else:
+            header = None
+
+        df = read_file_to_df(file_path, header=header, use_cols=self.exec_param.use_cols)
+        col_map = {'file_name': 'file_name'}
+        if 'sheet_name' in df.columns:
+            col_map['sheet_name'] = 'sheet_name'
+            pass
+
+        df.rename(mapper=self.exec_param.mapping_cols, errors='ignore', inplace=True)
+
+        if self.exec_param.split_cols:
+            df['split_col'] = df[self.exec_param.split_cols].apply(
+                lambda x: '_'.join([str(i).replace(' ', '_').replace(':', '_') for i in x.values]),
+                axis=1)
+        else:
+            df['split_col'] = 'All'
+
+        split_col = df['split_col'].unique()
+
+        if len(split_col) >= 1000:
+            log_print(f"{file_path}切割文件太多,大于等于1000个")
+            raise Exception(f"{file_path}切割文件太多,大于等于1000个")
+
+        general_fields = list(df.columns)
+        general_fields.remove('split_col')
+        general_fields.remove('file_name')
+        if 'sheet_name' in general_fields:
+            general_fields.remove('sheet_name')
+
+        for col in self.exec_param.index_cols:
+            general_fields.remove(col)
+        for split_col_value in split_col:
+            for col in general_fields:
+                now_cols = self.exec_param.index_cols
+                now_cols.append(col)
+                now_df = df[df['split_col'] == split_col_value][now_cols]
+
+                boolean_first_time, lock = self.get_lock(split_col_value, col)
+                with lock:
+                    path = self.exec_param.path_param.get_process_tmp_path()
+                    os.makedirs(path, exist_ok=True)
+                    if boolean_first_time:
+                        now_df.to_csv(os.path.join(path, f'{split_col_value}.csv'), index=False, encoding='utf-8')
+                    else:
+                        now_df.to_csv(os.path.join(path, f'{split_col_value}.csv'), index=False, mode='a', header=False,
+                                      encoding='utf-8')
+
+
+def run(self):
+    all_files = read_excel_files(self.exec_param.path_param.get_unzip_tmp_path())
+    split_count = use_files_get_max_cpu_count(all_files)
+    all_arrays = split_array(all_files, split_count)
+
+    log_print("开始读取竖向合并文件,文件总数:", len(all_files), ",文件分片数:", split_count)
+
+    with multiprocessing.Pool(split_count) as pool:
+        pool.map(self.read_and_save_file, all_arrays)

+ 18 - 11
trans/common/Axis1DataImpl.py

@@ -20,8 +20,8 @@ class Axis1DataImpl(object):
         self.exec_param = exec_param
         self.save_db = save_db
         self.lock_map = dict()
-        for i in range(1000):
-            self.lock_map[i] = multiprocessing.Manager().Lock()
+        # for i in range(4):
+        #     self.lock_map[i] = multiprocessing.Manager().Lock()
         self.lock = multiprocessing.Manager().Lock()
         self.field_dict = multiprocessing.Manager().dict()
 
@@ -33,14 +33,15 @@ class Axis1DataImpl(object):
             filed_name = col_name
 
         exists_count = len(self.field_dict.keys())
-        if exists_count >= 1000:
-            return self.lock
+        # if exists_count >= 4:
+        #     return self.lock
 
         if filed_name not in self.field_dict:
             boolean_first_time = True
             self.field_dict[filed_name] = len(self.field_dict.keys()) + 1
 
-        return boolean_first_time, self.lock_map[self.field_dict[filed_name]]
+        # return boolean_first_time, self.lock_map[self.field_dict[filed_name]]
+        return boolean_first_time, self.lock
 
     def read_and_save_file(self, file_path):
         if self.exec_param.has_header:
@@ -82,11 +83,15 @@ class Axis1DataImpl(object):
 
         general_fields = list(df.columns)
         general_fields.remove('split_col')
+        general_fields.remove('file_name')
+        if 'sheet_name' in general_fields:
+            general_fields.remove('sheet_name')
+
         for col in self.exec_param.index_cols:
             general_fields.remove(col)
         for split_col_value in split_col:
             for col in general_fields:
-                now_cols = self.exec_param.index_cols
+                now_cols = [i for i in self.exec_param.index_cols]
                 now_cols.append(col)
                 now_df = df[df['split_col'] == split_col_value][now_cols]
 
@@ -101,9 +106,11 @@ class Axis1DataImpl(object):
                                       encoding='utf-8')
 
     def read_merge_df_to_process(self, base_name):
-        all_files = os.listdir(os.path.join(self.exec_param.path_param.get_merge_tmp_path(), base_name))
-        dfs = [pd.read_csv(i, encoding='utf-8', index_col=self.exec_param.index_cols) for i in all_files]
-        df = pd.concat(dfs, axis=1, ignore_index=True)
+        path = os.path.join(self.exec_param.path_param.get_merge_tmp_path(), base_name)
+        all_files = os.listdir(path)
+        dfs = [pd.read_csv(os.path.join(path, i), encoding='utf-8', index_col=self.exec_param.index_cols) for i in
+               all_files]
+        df = pd.concat(dfs, axis=1)
         df.reset_index(inplace=True)
         df.to_csv(os.path.join(self.exec_param.path_param.get_process_tmp_path(), base_name + '.csv'), index=False,
                   encoding='utf-8')
@@ -119,11 +126,11 @@ class Axis1DataImpl(object):
         split_count = use_files_get_max_cpu_count(all_files)
         all_arrays = split_array(all_files, split_count)
 
-        log_print("开始读取文件,文件总数:", len(all_files), ",文件分片数:", split_count)
+        log_print("开始读取横向合并文件,文件总数:", len(all_files), ",文件分片数:", split_count)
 
         for index, now_array in enumerate(all_arrays):
             with multiprocessing.Pool(split_count) as pool:
-                pool.starmap(self.read_and_save_file, now_array)
+                pool.map(self.read_and_save_file, now_array)
 
             update_transfer_progress(self.id, round(20 + 50 * (index + 1) / len(all_arrays)), self.process_count,
                                      self.now_count, self.save_db)

+ 1 - 2
trans/common/ReadAndSaveTmp.py

@@ -5,12 +5,11 @@ from trans.common.Axis1DataImpl import Axis1DataImpl
 
 class ReadAndSaveTmp(object):
 
-    def __init__(self, id, process_count, now_count, exec_param: ExecParam, save_dir, save_db=True):
+    def __init__(self, id, process_count, now_count, exec_param: ExecParam,  save_db=True):
         self.id = id
         self.process_count = process_count
         self.now_count = now_count
         self.exec_param = exec_param
-        self.save_dir = save_dir
         self.save_db = save_db
 
     def run(self):

+ 2 - 2
utils/file/trans_methods.py

@@ -56,14 +56,14 @@ def find_header(file_path, use_cols=list()):
             if col in values:
                 count = count + 1
                 if count > 2:
-                    header = index + 1
+                    header = index
                     break
 
     return header
 
 
 # 读取数据到df
-def read_file_to_df(file_path, use_cols=list(), header=None, nrows=None):
+def read_file_to_df(file_path, use_cols=None, header=None, nrows=None):
     begin = datetime.datetime.now()
     log_print('开始读取文件', file_path)
     base_name = os.path.basename(file_path)