浏览代码

合并表数据问题修改(每个测点儿保存一个文件)

wzl 7 月之前
父节点
当前提交
bad05dfbde
共有 2 个文件被更改,包括 42 次插入29 次删除
  1. 37 25
      etl/wind_power/min_sec/ReadAndSaveTmp.py
  2. 5 4
      etl/wind_power/min_sec/StatisticsAndSaveFile.py

+ 37 - 25
etl/wind_power/min_sec/ReadAndSaveTmp.py

@@ -44,37 +44,39 @@ class ReadAndSaveTmp(object):
 
     def save_merge_data(self, file_path):
         df = self.read_excel_to_df(file_path)
-
         if self.trans_param.wind_name_exec:
             if valid_eval(self.trans_param.wind_name_exec):
                 exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
                 df['wind_turbine_number'] = eval(exec_str)
 
-        names = set(df['wind_turbine_number'].values)
+        df = self.trans_df_cols(df)
+
+        wind_names = set(df['wind_turbine_number'].values)
         cols = list(df.columns)
         cols.sort()
-        csv_name = base64.b64encode('-'.join(cols).encode('utf8')).decode('utf-8') + ".csv"
-        for name in names:
-            exist_name = name + '-' + csv_name
-            merge_path = self.pathsAndTable.get_merge_tmp_path(name)
-            create_file_path(merge_path)
-
-            with self.lock:
-                if exist_name in self.exist_wind_names:
-                    contains_name = True
-                else:
-                    contains_name = False
-                    self.exist_wind_names.append(exist_name)
-                save_path = path.join(merge_path, csv_name)
-                now_df = df[df['wind_turbine_number'] == name]
-                if contains_name:
-                    now_df.to_csv(save_path, index=False, encoding='utf-8', mode='a',
-                                  header=False)
-                else:
-                    now_df.to_csv(save_path, index=False, encoding='utf-8')
-
-    def df_save_to_tmp_file(self, df=pd.DataFrame()):
+        for wind_name in wind_names:
+            for col in df.columns:
+                if col not in ['wind_turbine_number', 'time_stamp']:
+                    csv_name = str(col) + ".csv"
+                    exist_name = wind_name + '-' + csv_name
+                    merge_path = self.pathsAndTable.get_merge_tmp_path(wind_name)
+                    create_file_path(merge_path)
+
+                    with self.lock:
+                        if exist_name in self.exist_wind_names:
+                            contains_name = True
+                        else:
+                            contains_name = False
+                            self.exist_wind_names.append(exist_name)
+                        save_path = path.join(merge_path, csv_name)
+                        now_df = df[df['wind_turbine_number'] == wind_name][['time_stamp', col]]
+                        if contains_name:
+                            now_df.to_csv(save_path, index=False, encoding='utf-8', mode='a',
+                                          header=False)
+                        else:
+                            now_df.to_csv(save_path, index=False, encoding='utf-8')
 
+    def trans_df_cols(self, df):
         if self.trans_param.is_vertical_table:
             pass
         else:
@@ -106,6 +108,12 @@ class ReadAndSaveTmp(object):
                 for key in del_keys:
                     df.drop(key, axis=1, inplace=True)
 
+        return df
+
+    def df_save_to_tmp_file(self, df=pd.DataFrame()):
+
+        df = self.trans_df_cols(df)
+
         df = del_blank(df, ['wind_turbine_number'])
         df = df[df['time_stamp'].isna() == False]
         if self.trans_param.wind_name_exec and not self.trans_param.merge_columns:
@@ -118,7 +126,7 @@ class ReadAndSaveTmp(object):
     def save_to_tmp_csv(self, df):
         names = set(df['wind_turbine_number'].values)
         if names:
-            trans_print("开始保存", str(names), "到临时文件")
+            trans_print("开始保存", str(names), "到临时文件", df.shape)
 
             for name in names:
                 self._save_to_tmp_csv_by_name(df[df['wind_turbine_number'] == name], name)
@@ -127,14 +135,17 @@ class ReadAndSaveTmp(object):
 
     def merge_df(self, dir_path):
         all_files = read_excel_files(dir_path)
+        wind_turbine_number = path.basename(dir_path)
         df = pd.DataFrame()
         for file in all_files:
             now_df = read_file_to_df(file)
+            now_df['wind_turbine_number'] = wind_turbine_number
             now_df.dropna(subset=['time_stamp'], inplace=True)
             now_df.drop_duplicates(subset=['time_stamp'], inplace=True)
             now_df.set_index(keys=['time_stamp', 'wind_turbine_number'], inplace=True)
             df = pd.concat([df, now_df], axis=1)
         df.reset_index(inplace=True)
+
         self.df_save_to_tmp_file(df)
 
         return df
@@ -162,7 +173,8 @@ class ReadAndSaveTmp(object):
             dirs = [path.join(self.pathsAndTable.get_merge_tmp_path(), dir_name) for dir_name in
                     listdir(self.pathsAndTable.get_merge_tmp_path())]
             dir_total_size = get_dir_size(dirs[0])
-            split_count = max_file_size_get_max_cpu_count(dir_total_size)
+            # split_count = max_file_size_get_max_cpu_count(dir_total_size, memory_percent=1 / 12, cpu_percent=1 / 10)
+            split_count = 2
             all_arrays = split_array(dirs, split_count)
             for index, arr in enumerate(all_arrays):
                 try:

+ 5 - 4
etl/wind_power/min_sec/StatisticsAndSaveFile.py

@@ -78,7 +78,7 @@ class StatisticsAndSaveFile(object):
 
         # 删除 有功功率 和 风速均为空的情况
         df.dropna(subset=['active_power', 'wind_velocity'], how='all', inplace=True)
-
+        trans_print(wind_col_name, "删除有功功率和风速均为空的情况:", df.shape)
         df.replace(np.nan, -999999999, inplace=True)
         number_cols = df.select_dtypes(include=['number']).columns.tolist()
         for col in df.columns:
@@ -87,6 +87,7 @@ class StatisticsAndSaveFile(object):
                     df[col] = pd.to_numeric(df[col], errors='coerce')
                     # 删除包含NaN的行(即那些列A转换失败的行)
                     df = df.dropna(subset=[col])
+                    trans_print(wind_col_name, "删除非数值列名:", col)
         df.replace(-999999999, np.nan, inplace=True)
 
         df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
@@ -109,10 +110,10 @@ class StatisticsAndSaveFile(object):
             df = df.groupby(['wind_turbine_number', 'time_stamp']).mean().reset_index()
 
         power = df.sample(int(df.shape[0] / 100))['active_power'].median()
-        if power > 10000:
-            df['active_power'] = df['active_power'] / 1000
+        if power > 100000:
+            df['active_power'] = df['active_power'] / 100000
         ## 做数据检测前,羡强行处理有功功率
-        df = df[df['active_power'] < 5000]
+        # df = df[df['active_power'] < 50000]
 
         rated_power_and_cutout_speed_tuple = read_conf(self.rated_power_and_cutout_speed_map, str(wind_col_name))
         if rated_power_and_cutout_speed_tuple is None: