فهرست منبع

优化临时文件保存

anmox 11 ماه پیش
والد
کامیت
53c55a9186
2فایلهای تغییر یافته به همراه83 افزوده شده و 50 حذف شده
  1. 79 46
      etl/base/WindFarms.py
  2. 4 4
      schedule_service.py

+ 79 - 46
etl/base/WindFarms.py

@@ -24,14 +24,13 @@ class WindFarms(object):
         self.wind_full_name = wind_full_name
         self.save_zip = False
         self.trans_param = params
-        self.__exist_wind_names = set()
+        self.__exist_wind_names = multiprocessing.Manager().list()
         self.wind_col_trans = get_all_wind(self.field_code)
         self.batch_count = 50000
         self.save_path = None
         self.schedule_exec = schedule_exec
-        self.min_date = None
-        self.max_date = None
-        self.total_count = 0
+        self.lock = multiprocessing.Manager().Lock()
+        self.statistics_map = multiprocessing.Manager().dict()
 
     def set_trans_param(self, params: TranseParam):
         self.trans_param = params
@@ -173,45 +172,67 @@ class WindFarms(object):
 
             return df
 
-    def __save_to_tmp_csv(self, df, file):
-        trans_print("开始保存", str(file), "到临时文件成功")
-        names = set(df['wind_turbine_number'].values)
-        for name in names:
-            save_name = str(name) + '.csv'
-            save_path = os.path.join(self.__get_read_tmp_path(), save_name)
-            create_file_path(save_path, is_file_path=True)
+    def _save_to_tmp_csv_by_name(self, df, name):
+        save_name = str(name) + '.csv'
+        save_path = os.path.join(self.__get_read_tmp_path(), save_name)
+        create_file_path(save_path, is_file_path=True)
+
+        with self.lock:
             if name in self.__exist_wind_names:
-                df[df['wind_turbine_number'] == name].to_csv(save_path, index=False, encoding='utf8', mode='a',
-                                                             header=False)
+                contains_name = True
             else:
-                self.__exist_wind_names.add(name)
-                df[df['wind_turbine_number'] == name].to_csv(save_path, index=False, encoding='utf8')
+                contains_name = False
+                self.__exist_wind_names.append(name)
+
+        if contains_name:
+            df.to_csv(save_path, index=False, encoding='utf8', mode='a',
+                      header=False)
+        else:
+            df.to_csv(save_path, index=False, encoding='utf8')
+
+    def __save_to_tmp_csv(self, df, file):
+        trans_print("开始保存", str(file), "到临时文件")
+        names = set(df['wind_turbine_number'].values)
 
-        self.__set_tongji_data(df)
+        with multiprocessing.Pool(6) as pool:
+            pool.starmap(self._save_to_tmp_csv_by_name,
+                         [(df[df['wind_turbine_number'] == name], name) for name in names])
         del df
         trans_print("保存", str(names), "到临时文件成功, 风机数量", len(names))
 
-    def __set_tongji_data(self, df):
+    def __set_statistics_data(self, df):
 
-        min_date = df['time_stamp'].min()
-        max_date = df['time_stamp'].max()
-        if self.min_date is None or self.min_date > min_date:
-            self.min_date = min_date
+        if not df.empty:
+            min_date = pd.to_datetime(df['time_stamp']).min()
+            max_date = pd.to_datetime(df['time_stamp']).max()
+            with self.lock:
 
-        if self.max_date is None or self.max_date < max_date:
-            self.max_date = max_date
+                if 'min_date' in self.statistics_map.keys():
+                    if self.statistics_map['min_date'] > min_date:
+                        self.statistics_map['min_date'] = min_date
+                else:
+                    self.statistics_map['min_date'] = min_date
 
-        self.total_count = self.total_count + df.shape[0]
+                if 'max_date' in self.statistics_map.keys():
+                    if self.statistics_map['max_date'] < max_date:
+                        self.statistics_map['max_date'] = max_date
+                else:
+                    self.statistics_map['max_date'] = max_date
+
+                if 'total_count' in self.statistics_map.keys():
+                    self.statistics_map['total_count'] = self.statistics_map['total_count'] + df.shape[0]
+                else:
+                    self.statistics_map['total_count'] = df.shape[0]
 
     def save_statistics_file(self):
         save_path = os.path.join(os.path.dirname(self.__get_save_path()),
                                  self.trans_param.read_type + '_statistics.txt')
         create_file_path(save_path, is_file_path=True)
         with open(save_path, 'w', encoding='utf8') as f:
-            f.write("总数据量:" + str(self.total_count) + "\n")
-            f.write("最小时间:" + str(self.min_date) + "\n")
-            f.write("最大时间:" + str(self.max_date) + "\n")
-            f.write("风机数量:" + str(len(self.__exist_wind_names)) + "\n")
+            f.write("总数据量:" + str(self.statistics_map['total_count']) + "\n")
+            f.write("最小时间:" + str(self.statistics_map['min_date']) + "\n")
+            f.write("最大时间:" + str(self.statistics_map['max_date']) + "\n")
+            f.write("风机数量:" + str(len(read_excel_files(self.__get_read_tmp_path()))) + "\n")
 
     def save_to_csv(self, filename):
         df = read_file_to_df(filename)
@@ -263,6 +284,8 @@ class WindFarms(object):
         else:
             df.to_csv(save_path, index=False, encoding='utf-8')
 
+        self.__set_statistics_data(df)
+
         del df
         trans_print("保存" + str(filename) + ".csv成功")
 
@@ -278,22 +301,21 @@ class WindFarms(object):
             raise e
         return all_files
 
-    def __read_file_and_save_tmp(self):
+    def read_file_and_save_tmp(self):
 
-        all_files = self.read_all_files()
+        all_files = read_excel_files(self.__get_save_tmp_path())
         if self.trans_param.merge_columns:
-            # with multiprocessing.Pool(6) as pool:
-            #     dfs = pool.starmap(self.__read_excel_to_df, [(file,) for file in all_files])
-            dfs = list()
+            dfs_list = list()
             index_keys = [self.trans_param.cols_tran['time_stamp']]
             wind_col = self.trans_param.cols_tran['wind_turbine_number']
             if str(wind_col).startswith("$"):
                 wind_col = 'wind_turbine_number'
             index_keys.append(wind_col)
             df_map = dict()
-            for file in all_files:
-                df = self.__read_excel_to_df(file)
+            with multiprocessing.Pool(6) as pool:
+                dfs = pool.starmap(self.__read_excel_to_df, [(file,) for file in all_files])
 
+            for df in dfs:
                 key = '-'.join(df.columns)
                 if key in df_map.keys():
                     df_map[key] = pd.concat([df_map[key], df])
@@ -304,14 +326,15 @@ class WindFarms(object):
                 df.drop_duplicates(inplace=True)
                 df.set_index(keys=index_keys, inplace=True)
                 df = df[~df.index.duplicated(keep='first')]
-                dfs.append(df)
+                dfs_list.append(df)
 
-            df = pd.concat(dfs, axis=1)
+            df = pd.concat(dfs_list, axis=1)
             df.reset_index(inplace=True)
-            names = set(df[wind_col].values)
+            # names = set(df[wind_col].values)
             try:
-                for name in names:
-                    self.__df_save_to_tmp_file(df[df[wind_col] == name], "")
+                # for name in names:
+                #     self.__df_save_to_tmp_file(df[df[wind_col] == name], "")
+                self.__df_save_to_tmp_file(df, "")
             except Exception as e:
                 logger.exception(e)
                 message = "合并列出现错误:" + str(e)
@@ -363,7 +386,7 @@ class WindFarms(object):
             raise e
         trans_print("结束保存到数据库文件")
 
-    def __rename_file(self):
+    def _rename_file(self):
         save_path = self.__get_save_path()
         files = os.listdir(save_path)
 
@@ -398,11 +421,12 @@ class WindFarms(object):
         begin = datetime.datetime.now()
         trans_print("开始执行", self.name, self.trans_param.read_type)
 
+        update_trans_status_running(self.batch_no, self.trans_param.read_type, self.schedule_exec)
+
         if step <= 0 and end >= 0:
             tmp_begin = datetime.datetime.now()
             trans_print("开始初始化字段")
             self.delete_batch_files()
-            self.delete_tmp_files()
             self.delete_batch_db()
 
             self.__params_valid([self.name, self.batch_no, self.field_code, self.save_path, self.trans_param.read_type,
@@ -422,15 +446,24 @@ class WindFarms(object):
         if step <= 1 and end >= 1:
             # 更新运行状态到运行中
             tmp_begin = datetime.datetime.now()
+            self.delete_tmp_files()
+            trans_print("开始保存到临时路径")
+            # 开始读取数据并分类保存临时文件
+            self.read_all_files()
+            trans_print("保存到临时路径结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
+                        str(datetime.datetime.now() - begin))
+
+        if step <= 2 and end >= 2:
+            # 更新运行状态到运行中
+            tmp_begin = datetime.datetime.now()
             trans_print("开始保存到临时文件")
-            update_trans_status_running(self.batch_no, self.trans_param.read_type, self.schedule_exec)
 
             # 开始读取数据并分类保存临时文件
-            self.__read_file_and_save_tmp()
+            self.read_file_and_save_tmp()
             trans_print("保存到临时文件结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
                         str(datetime.datetime.now() - begin))
 
-        if step <= 2 and end >= 2:
+        if step <= 3 and end >= 3:
             tmp_begin = datetime.datetime.now()
             trans_print("开始保存到文件")
             self.mutiprocessing_to_save_file()
@@ -438,7 +471,7 @@ class WindFarms(object):
             trans_print("保存到文件结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
                         str(datetime.datetime.now() - begin))
 
-        if step <= 3 and end >= 3:
+        if step <= 4 and end >= 4:
             tmp_begin = datetime.datetime.now()
             trans_print("开始保存到数据库")
             self.mutiprocessing_to_save_db()

+ 4 - 4
schedule_service.py

@@ -105,10 +105,10 @@ def __exec_trans(step, end, batch_no, transfer_type, transfer_file_addr=None, fi
 
 if __name__ == '__main__':
     step = 0
-    end = 2
-    batch_no = '测试-111'
+    end = 3
+    batch_no = 'hongtiguan-test'
     transfer_type = 'second'
-    transfer_file_addr = r'D:\transdata\test\唐龙三期风电场-安徽-大唐\收资数据\second'
-    field_name = '唐龙三期风电场'
+    transfer_file_addr = r'/data/download/collection_data/1进行中/虹梯官风电场-山西-大唐/收资数据/秒级数据/20240527秒级数据'
+    field_name = '虹梯官风电场'
     field_code = "测试"
     run_local(step, end, batch_no, transfer_type, transfer_file_addr, field_name, field_code)