Forráskód Böngészése

优化执行时间

anmox 1 éve
szülő
commit
fc097dae2e
3 módosított fájl, 90 hozzáadás és 76 törlés
  1. 77 72
      etl/base/WindFarms.py
  2. 5 0
      utils/file/trans_methods.py
  3. 8 4
      utils/zip/unzip.py

+ 77 - 72
etl/base/WindFarms.py

@@ -8,10 +8,10 @@ import tempfile
 from etl.base.TranseParam import TranseParam
 from service.plt_service import get_all_wind, update_trans_status_error, update_trans_status_running, \
     update_trans_status_success
-from service.trans_service import creat_table_and_add_partition, rename_table, save_df_to_db, save_file_to_db
+from service.trans_service import creat_table_and_add_partition, rename_table, save_file_to_db
 from utils.file.trans_methods import *
 from utils.log.trans_log import logger
-from utils.zip.unzip import unzip, unrar
+from utils.zip.unzip import unzip, unrar, get_desc_path
 
 
 class WindFarms(object):
@@ -24,7 +24,7 @@ class WindFarms(object):
         self.wind_full_name = wind_full_name
         self.save_zip = False
         self.trans_param = params
-        self.__exist_wind_names = multiprocessing.Manager().list()
+        self.exist_wind_names = multiprocessing.Manager().list()
         self.wind_col_trans = get_all_wind(self.field_code)
         self.batch_count = 50000
         self.save_path = None
@@ -43,24 +43,24 @@ class WindFarms(object):
 
         self.save_path = os.path.join(read_path[0:read_path.find(self.wind_full_name)], self.wind_full_name, "清理数据")
 
-    def __params_valid(self, not_null_list=list()):
+    def params_valid(self, not_null_list=list()):
         for arg in not_null_list:
             if arg is None or arg == '':
                 raise Exception("Invalid param set :" + arg)
 
-    def __get_save_path(self):
+    def get_save_path(self):
         return os.path.join(self.save_path, self.batch_no, self.trans_param.read_type)
 
-    def __get_save_tmp_path(self):
+    def get_save_tmp_path(self):
         return os.path.join(tempfile.gettempdir(), self.wind_full_name, self.batch_no, self.trans_param.read_type)
 
-    def __get_excel_tmp_path(self):
-        return os.path.join(self.__get_save_tmp_path(), 'excel_tmp' + os.sep)
+    def get_excel_tmp_path(self):
+        return os.path.join(self.get_save_tmp_path(), 'excel_tmp' + os.sep)
 
-    def __get_read_tmp_path(self):
-        return os.path.join(self.__get_save_tmp_path(), 'read_tmp')
+    def get_read_tmp_path(self):
+        return os.path.join(self.get_save_tmp_path(), 'read_tmp')
 
-    def __df_save_to_tmp_file(self, df=pd.DataFrame(), file=None):
+    def df_save_to_tmp_file(self, df=pd.DataFrame(), file=None):
 
         if self.trans_param.is_vertical_table:
             pass
@@ -82,36 +82,30 @@ class WindFarms(object):
                     df.drop(key, axis=1, inplace=True)
 
         df = del_blank(df, ['wind_turbine_number'])
-        self.__save_to_tmp_csv(df, file)
+        self.save_to_tmp_csv(df, file)
 
-    def __get_excel_files(self):
+    def get_and_remove(self, file):
 
-        if os.path.isfile(self.trans_param.read_path):
-            all_files = [self.trans_param.read_path]
-        else:
-            all_files = read_files(self.trans_param.read_path)
-
-        to_path = self.__get_excel_tmp_path()
-        for file in all_files:
-            if str(file).endswith("zip"):
-                if str(file).endswith("csv.zip"):
-                    copy_to_new(file, file.replace(self.trans_param.read_path, to_path).replace("csv.zip", 'csv.gz'))
-                else:
-                    is_success, e = unzip(file, file.replace(self.trans_param.read_path, to_path).split(".")[0])
-                    self.trans_param.has_zip = True
-                    if not is_success:
-                        raise e
-            elif str(file).endswith("rar"):
-                is_success, e = unrar(file, file.replace(self.trans_param.read_path, to_path).split(".")[0])
+        to_path = self.get_excel_tmp_path()
+        if str(file).endswith("zip"):
+            if str(file).endswith("csv.zip"):
+                copy_to_new(file, file.replace(self.trans_param.read_path, to_path).replace("csv.zip", 'csv.gz'))
+            else:
+                desc_path = file.replace(self.trans_param.read_path, to_path)
+                is_success, e = unzip(file, get_desc_path(desc_path))
                 self.trans_param.has_zip = True
                 if not is_success:
                     raise e
-            else:
-                copy_to_new(file, file.replace(self.trans_param.read_path, to_path))
-
-        return read_excel_files(to_path)
+        elif str(file).endswith("rar"):
+            desc_path = file.replace(self.trans_param.read_path, to_path)
+            is_success, e = unrar(file, get_desc_path(desc_path))
+            self.trans_param.has_zip = True
+            if not is_success:
+                raise e
+        else:
+            copy_to_new(file, file.replace(self.trans_param.read_path, to_path))
 
-    def __read_excel_to_df(self, file):
+    def read_excel_to_df(self, file):
 
         read_cols = [v for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")]
 
@@ -174,15 +168,15 @@ class WindFarms(object):
 
     def _save_to_tmp_csv_by_name(self, df, name):
         save_name = str(name) + '.csv'
-        save_path = os.path.join(self.__get_read_tmp_path(), save_name)
+        save_path = os.path.join(self.get_read_tmp_path(), save_name)
         create_file_path(save_path, is_file_path=True)
 
         with self.lock:
-            if name in self.__exist_wind_names:
+            if name in self.exist_wind_names:
                 contains_name = True
             else:
                 contains_name = False
-                self.__exist_wind_names.append(name)
+                self.exist_wind_names.append(name)
 
         if contains_name:
             df.to_csv(save_path, index=False, encoding='utf8', mode='a',
@@ -190,7 +184,7 @@ class WindFarms(object):
         else:
             df.to_csv(save_path, index=False, encoding='utf8')
 
-    def __save_to_tmp_csv(self, df, file):
+    def save_to_tmp_csv(self, df, file):
         trans_print("开始保存", str(file), "到临时文件")
         names = set(df['wind_turbine_number'].values)
 
@@ -200,7 +194,7 @@ class WindFarms(object):
         del df
         trans_print("保存", str(names), "到临时文件成功, 风机数量", len(names))
 
-    def __set_statistics_data(self, df):
+    def set_statistics_data(self, df):
 
         if not df.empty:
             min_date = pd.to_datetime(df['time_stamp']).min()
@@ -225,14 +219,14 @@ class WindFarms(object):
                     self.statistics_map['total_count'] = df.shape[0]
 
     def save_statistics_file(self):
-        save_path = os.path.join(os.path.dirname(self.__get_save_path()),
+        save_path = os.path.join(os.path.dirname(self.get_save_path()),
                                  self.trans_param.read_type + '_statistics.txt')
         create_file_path(save_path, is_file_path=True)
         with open(save_path, 'w', encoding='utf8') as f:
             f.write("总数据量:" + str(self.statistics_map['total_count']) + "\n")
             f.write("最小时间:" + str(self.statistics_map['min_date']) + "\n")
             f.write("最大时间:" + str(self.statistics_map['max_date']) + "\n")
-            f.write("风机数量:" + str(len(read_excel_files(self.__get_read_tmp_path()))) + "\n")
+            f.write("风机数量:" + str(len(read_excel_files(self.get_read_tmp_path()))) + "\n")
 
     def save_to_csv(self, filename):
         df = read_file_to_df(filename)
@@ -275,24 +269,33 @@ class WindFarms(object):
         wind_col_name = str(df['wind_turbine_number'].values[0])
 
         if self.save_zip:
-            save_path = os.path.join(self.__get_save_path(), str(wind_col_name) + '.csv.gz')
+            save_path = os.path.join(self.get_save_path(), str(wind_col_name) + '.csv.gz')
         else:
-            save_path = os.path.join(self.__get_save_path(), str(wind_col_name) + '.csv')
+            save_path = os.path.join(self.get_save_path(), str(wind_col_name) + '.csv')
         create_file_path(save_path, is_file_path=True)
         if self.save_zip:
             df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
         else:
             df.to_csv(save_path, index=False, encoding='utf-8')
 
-        self.__set_statistics_data(df)
+        self.set_statistics_data(df)
 
         del df
         trans_print("保存" + str(filename) + ".csv成功")
 
-    def read_all_files(self):
+    def remove_file_to_tmp_path(self):
         # 读取文件
         try:
-            all_files = self.__get_excel_files()
+            if os.path.isfile(self.trans_param.read_path):
+                all_files = [self.trans_param.read_path]
+            else:
+                all_files = read_files(self.trans_param.read_path)
+
+            with multiprocessing.Pool(6) as pool:
+                pool.starmap(self.get_and_remove, [(i,) for i in all_files])
+
+            all_files = read_excel_files(self.get_excel_tmp_path())
+
             trans_print('读取文件数量:', len(all_files))
         except Exception as e:
             logger.exception(e)
@@ -303,7 +306,7 @@ class WindFarms(object):
 
     def read_file_and_save_tmp(self):
 
-        all_files = read_excel_files(self.__get_save_tmp_path())
+        all_files = read_excel_files(self.get_excel_tmp_path())
         if self.trans_param.merge_columns:
             dfs_list = list()
             index_keys = [self.trans_param.cols_tran['time_stamp']]
@@ -313,7 +316,7 @@ class WindFarms(object):
             index_keys.append(wind_col)
             df_map = dict()
             with multiprocessing.Pool(6) as pool:
-                dfs = pool.starmap(self.__read_excel_to_df, [(file,) for file in all_files])
+                dfs = pool.starmap(self.read_excel_to_df, [(file,) for file in all_files])
 
             for df in dfs:
                 key = '-'.join(df.columns)
@@ -330,11 +333,8 @@ class WindFarms(object):
 
             df = pd.concat(dfs_list, axis=1)
             df.reset_index(inplace=True)
-            # names = set(df[wind_col].values)
             try:
-                # for name in names:
-                #     self.__df_save_to_tmp_file(df[df[wind_col] == name], "")
-                self.__df_save_to_tmp_file(df, "")
+                self.df_save_to_tmp_file(df, "")
             except Exception as e:
                 logger.exception(e)
                 message = "合并列出现错误:" + str(e)
@@ -342,19 +342,24 @@ class WindFarms(object):
                 raise e
 
         else:
-            for file in all_files:
+            all_arrays = split_array(all_files, 6)
+            for arr in all_arrays:
+                with multiprocessing.Pool(6) as pool:
+                    dfs = pool.starmap(self.read_excel_to_df, [(ar,) for ar in arr])
                 try:
-                    self.__df_save_to_tmp_file(self.__read_excel_to_df(file), file)
+                    for df in dfs:
+                        self.df_save_to_tmp_file(df)
                 except Exception as e:
                     logger.exception(e)
-                    message = "读取文件错误:" + file + ",系统返回错误:" + str(e)
-                    update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.schedule_exec)
+                    message = "整理临时文件,系统返回错误:" + str(e)
+                    update_trans_status_error(self.batch_no, self.trans_param.read_type, message,
+                                              self.schedule_exec)
                     raise e
 
     def mutiprocessing_to_save_file(self):
         # 开始保存到正式文件
         trans_print("开始保存到excel文件")
-        all_tmp_files = read_excel_files(self.__get_read_tmp_path())
+        all_tmp_files = read_excel_files(self.get_read_tmp_path())
         try:
             with multiprocessing.Pool(6) as pool:
                 pool.starmap(self.save_to_csv, [(file,) for file in all_tmp_files])
@@ -370,7 +375,7 @@ class WindFarms(object):
     def mutiprocessing_to_save_db(self):
         # 开始保存到SQL文件
         trans_print("开始保存到数据库文件")
-        all_saved_files = read_excel_files(self.__get_save_path())
+        all_saved_files = read_excel_files(self.get_save_path())
         table_name = self.batch_no + "_" + self.trans_param.read_type
         creat_table_and_add_partition(table_name, len(all_saved_files), self.trans_param.read_type)
         try:
@@ -387,7 +392,7 @@ class WindFarms(object):
         trans_print("结束保存到数据库文件")
 
     def _rename_file(self):
-        save_path = self.__get_save_path()
+        save_path = self.get_save_path()
         files = os.listdir(save_path)
 
         files.sort(key=lambda x: int(str(x).split(os.sep)[-1].split(".")[0][1:]))
@@ -397,18 +402,18 @@ class WindFarms(object):
 
     def delete_batch_files(self):
         trans_print("开始删除已存在的批次文件夹")
-        if os.path.exists(self.__get_save_path()):
-            shutil.rmtree(self.__get_save_path())
+        if os.path.exists(self.get_save_path()):
+            shutil.rmtree(self.get_save_path())
         trans_print("删除已存在的批次文件夹")
 
     def delete_tmp_files(self):
         trans_print("开始删除临时文件夹")
-        if os.path.exists(self.__get_excel_tmp_path()):
-            shutil.rmtree(self.__get_excel_tmp_path())
-        if os.path.exists(self.__get_read_tmp_path()):
-            shutil.rmtree(self.__get_read_tmp_path())
-        if os.path.exists(self.__get_save_tmp_path()):
-            shutil.rmtree(self.__get_save_tmp_path())
+        if os.path.exists(self.get_excel_tmp_path()):
+            shutil.rmtree(self.get_excel_tmp_path())
+        if os.path.exists(self.get_read_tmp_path()):
+            shutil.rmtree(self.get_read_tmp_path())
+        if os.path.exists(self.get_save_tmp_path()):
+            shutil.rmtree(self.get_save_tmp_path())
 
         trans_print("删除临时文件夹删除成功")
 
@@ -429,8 +434,8 @@ class WindFarms(object):
             self.delete_batch_files()
             self.delete_batch_db()
 
-            self.__params_valid([self.name, self.batch_no, self.field_code, self.save_path, self.trans_param.read_type,
-                                 self.trans_param.read_path, self.wind_full_name])
+            self.params_valid([self.name, self.batch_no, self.field_code, self.save_path, self.trans_param.read_type,
+                               self.trans_param.read_path, self.wind_full_name])
 
             if self.trans_param.resolve_col_prefix:
                 column = "测试"
@@ -449,7 +454,7 @@ class WindFarms(object):
             self.delete_tmp_files()
             trans_print("开始保存到临时路径")
             # 开始读取数据并分类保存临时文件
-            self.read_all_files()
+            self.remove_file_to_tmp_path()
             trans_print("保存到临时路径结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
                         str(datetime.datetime.now() - begin))
 
@@ -480,7 +485,7 @@ class WindFarms(object):
         # 如果end==0 则说明只是进行了验证
         if end != 0:
             update_trans_status_success(self.batch_no, self.trans_param.read_type,
-                                        len(read_excel_files(self.__get_read_tmp_path())), self.schedule_exec)
+                                        len(read_excel_files(self.get_read_tmp_path())), self.schedule_exec)
 
         trans_print("开始执行", self.name, self.trans_param.read_type, ",,总耗时:",
                     str(datetime.datetime.now() - begin))

+ 5 - 0
utils/file/trans_methods.py

@@ -39,6 +39,11 @@ def del_blank(df=pd.DataFrame(), cols=list()):
     return df
 
 
+# 切割数组到多个数组
+def split_array(array, num):
+    return [array[i:i + num] for i in range(0, len(array), num)]
+
+
 # 读取数据到df
 def read_file_to_df(file_path, read_cols=list()):
     trans_print('开始读取文件', file_path)

+ 8 - 4
utils/zip/unzip.py

@@ -50,13 +50,13 @@ def unzip(zip_filepath, dest_path):
                         os.rename(file_path, file_path.replace(".csv.zip", ".csv.gz"))
                     else:
                         # 如果是,递归解压
-                        unzip(file_path, dest_path + os.sep + str(file).split(".")[0])
+                        unzip(file_path, dest_path + os.sep + get_desc_path(str(file)))
                         # 删除已解压的zip文件(可选)
                         os.remove(file_path)
                     # 检查文件是否是zip文件
                 if file_path.endswith('.rar'):
                     # 如果是,递归解压
-                    unrar(file_path, dest_path + os.sep + str(file).split(".")[0])
+                    unrar(file_path, dest_path + os.sep + get_desc_path(str(file)))
                     # 删除已解压的zip文件(可选)
                     os.remove(file_path)
 
@@ -96,14 +96,18 @@ def unrar(rar_file_path, dest_dir):
                 # 检查文件是否是zip文件
                 if file_path.endswith('.rar'):
                     # 如果是,递归解压
-                    unrar(file_path, file_path.split(".")[0])
+                    unrar(file_path, get_desc_path(file_path))
                     # 删除已解压的zip文件(可选)
                     os.remove(file_path)
 
                 if file_path.endswith('.zip'):
                     # 如果是,递归解压
-                    unzip(file_path, file_path.split(".")[0])
+                    unzip(file_path, get_desc_path(file_path))
                     # 删除已解压的zip文件(可选)
                     os.remove(file_path)
 
     return is_success, ''
+
+
+def get_desc_path(path):
+    return path[0:path.rfind(".")]