Przeglądaj źródła

添加合并表业务逻辑,加快执行速度

wzl 11 miesięcy temu
rodzic
commit
adbae467ae

+ 2 - 1
.gitignore

@@ -2,4 +2,5 @@ logs
 *.pyc
 *.iml
 .idea
-test
+test
+tmp

+ 1 - 1
app_run.py

@@ -95,7 +95,7 @@ def __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_a
                             wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
                             vertical_cols=vertical_cols, vertical_key=vertical_key,
                             vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
-                            resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols)
+                            resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols, header=begin_header)
 
         try:
             trans_subject = WindFarms(batch_no=batch_no, batch_name=batch_name, field_code=field_code,

+ 6 - 0
etl/base/PathsAndTable.py

@@ -48,6 +48,12 @@ class PathsAndTable(object):
     def get_read_tmp_path(self):
         return os.path.join(self.get_save_tmp_path(), 'read_tmp')
 
+    def get_merge_tmp_path(self, wind_turbine_number=None):
+        if wind_turbine_number is None:
+            return os.path.join(self.get_save_tmp_path(), 'merge_tmp')
+        else:
+            return os.path.join(self.get_save_tmp_path(), 'merge_tmp', str(wind_turbine_number))
+
     def get_table_name(self):
         return "_".join([self.batch_no, self.read_type])
 

+ 12 - 17
etl/base/WindFarms.py

@@ -4,11 +4,8 @@
 import datetime
 import multiprocessing
 
-import pandas as pd
-
 from etl.base.PathsAndTable import PathsAndTable
 from etl.base.TransParam import TransParam
-from etl.step.ClassIdentifier import ClassIdentifier
 from etl.step.ClearData import ClearData
 from etl.step.ReadAndSaveTmp import ReadAndSaveTmp
 from etl.step.SaveToDb import SaveToDb
@@ -31,12 +28,10 @@ class WindFarms(object):
         self.field_name = field_name
         self.save_zip = False
         self.trans_param = params
-        self.exist_wind_names = multiprocessing.Manager().list()
-        self.wind_col_trans, self.rated_power_map = get_all_wind(self.field_code)
+        self.wind_col_trans, self.rated_power_and_cutout_speed_map = get_all_wind(self.field_code)
         self.batch_count = 50000
         self.save_path = None
         self.save_db = save_db
-        self.lock = multiprocessing.Manager().Lock()
         self.statistics_map = multiprocessing.Manager().dict()
         self.header = header
         self.trans_param = trans_param
@@ -49,28 +44,28 @@ class WindFarms(object):
         trans_print("开始执行")
         update_trans_status_running(self.batch_no, self.trans_param.read_type, self.save_db)
         if step <= 0 and end >= 0:
-            cleanData = ClearData(self.pathsAndTable)
-            cleanData.run()
+            clean_data = ClearData(self.pathsAndTable)
+            clean_data.run()
 
         if step <= 1 and end >= 1:
             # 更新运行状态到运行中
-            unzipAndRemove = UnzipAndRemove(self.pathsAndTable)
-            unzipAndRemove.run()
+            unzip_and_remove = UnzipAndRemove(self.pathsAndTable)
+            unzip_and_remove.run()
 
         if step <= 2 and end >= 2:
-            readAndSaveTmp = ReadAndSaveTmp(self.pathsAndTable, self.trans_param)
-            readAndSaveTmp.run()
+            read_and_save_tmp = ReadAndSaveTmp(self.pathsAndTable, self.trans_param)
+            read_and_save_tmp.run()
 
         if step <= 3 and end >= 3:
             # 保存到正式文件
-            statisticsAndSaveFile = StatisticsAndSaveFile(self.pathsAndTable, self.trans_param, self.statistics_map,
-                                                          self.rated_power_map)
-            statisticsAndSaveFile.run()
+            statistics_and_save_file = StatisticsAndSaveFile(self.pathsAndTable, self.trans_param, self.statistics_map,
+                                                             self.rated_power_and_cutout_speed_map)
+            statistics_and_save_file.run()
 
         if step <= 4 and end >= 4:
             if self.save_db:
-                saveToDb = SaveToDb(self.pathsAndTable)
-                saveToDb.run()
+                save_to_db = SaveToDb(self.pathsAndTable)
+                save_to_db.run()
 
         update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 99, self.save_db)
         # 如果end==0 则说明只是进行了验证

+ 97 - 116
etl/step/ClassIdentifier.py

@@ -1,11 +1,11 @@
-import os
+import datetime
 
 import numpy as np
 from pandas import DataFrame
 
-from utils.draw.draw_file import scatter
 from utils.file.trans_methods import read_file_to_df
 from utils.log.trans_log import trans_print
+from utils.systeminfo.sysinfo import print_memory_usage
 
 
 class ClassIdentifier(object):
@@ -17,7 +17,7 @@ class ClassIdentifier(object):
                  wind_velocity='wind_velocity',
                  active_power='active_power',
                  pitch_angle_blade='pitch_angle_blade_1',
-                 rated_power=1500, file_path: str = None):
+                 rated_power=1500, cut_out_speed=3, file_path: str = None):
         """
         :param file_path: The file path of the input data.
         :param origin_df: The pandas DataFrame containing the input data.
@@ -25,16 +25,23 @@ class ClassIdentifier(object):
         :param active_power: 有功功率字段
         :param pitch_angle_blade: 桨距角
         :param rated_power: 额定功率
+        :param cut_out_speed: 切出风速
         """
+        self.wind_turbine_number = wind_turbine_number
         self.wind_velocity = wind_velocity
         self.active_power = active_power
         self.pitch_angle_blade = pitch_angle_blade
         self.rated_power = rated_power  # 额定功率1500kw,可改为2000kw
+        self.cut_out_speed = cut_out_speed
 
         if self.rated_power is None:
             trans_print(wind_turbine_number, "WARNING:rated_power配置为空的")
             self.rated_power = 1500
 
+        if self.cut_out_speed is None:
+            trans_print(cut_out_speed, "WARNING:cut_out_speed配置为空的")
+            self.cut_out_speed = 3
+
         if file_path is None and origin_df is None:
             raise ValueError("Either file_path or origin_df should be provided.")
 
@@ -45,40 +52,37 @@ class ClassIdentifier(object):
 
     def identifier(self):
         # 风速 和 有功功率 df
-        # wind_and_power_df = self.df[[self.wind_velocity, self.active_power, "pitch_angle_blade_1"]]
-        wind_and_power_df = self.df
-        wind_and_power_df.reset_index(inplace=True)
-        wind_and_power_df_count = wind_and_power_df.shape[0]
-        power_max = wind_and_power_df[self.active_power].max()
+        # self.df = self.df[[self.wind_velocity, self.active_power, "pitch_angle_blade_1"]]
+        self.df.reset_index(inplace=True)
+        wind_and_power_df_count = self.df.shape[0]
+        power_max = self.df[self.active_power].max()
         power_rated = np.ceil(power_max / 100) * 100
-        v_cut_out = 25
+        v_cut_out = self.cut_out_speed
         # 网格法确定风速风向分区数量,功率方向分区数量,
-        p_num = int(np.ceil(power_rated / 25))  # 功率分区间隔25kW
-        v_num = int(np.ceil(v_cut_out / 0.25))  # 风速分区间隔0.25m/s
+        power_num = int(np.ceil(power_rated / 25))  # 功率分区间隔25kW
+        velocity_num = int(np.ceil(v_cut_out / 0.25))  # 风速分区间隔0.25m/s
 
         # 存储功率大于零的运行数据
         dz_march = np.zeros([wind_and_power_df_count, 2], dtype=float)
         n_counter1 = 0
         for i in range(wind_and_power_df_count):
-            if wind_and_power_df.loc[i, self.active_power] > 0:
-                dz_march[n_counter1, 0] = wind_and_power_df.loc[i, self.wind_velocity]
-                dz_march[n_counter1, 1] = wind_and_power_df.loc[i, self.active_power]
+            if self.df.loc[i, self.active_power] > 0:
+                dz_march[n_counter1, 0] = self.df.loc[i, self.wind_velocity]
+                dz_march[n_counter1, 1] = self.df.loc[i, self.active_power]
 
                 n_counter1 = n_counter1 + 1
 
         # 统计各网格落入的散点个数
-        if v_num == 1:
-            x_box_number = np.ones([p_num], dtype=int)
-        else:
-            x_box_number = np.ones([p_num, v_num], dtype=int)
+        x_box_number = np.ones([power_num, velocity_num], dtype=int)
+
         n_which_p = -1
         n_which_v = -1
         for i in range(n_counter1):
-            for m in range(p_num):
+            for m in range(power_num):
                 if m * 25 < dz_march[i, 1] <= (m + 1) * 25:
                     n_which_p = m
                     break
-            for n in range(v_num):
+            for n in range(velocity_num):
                 if ((n + 1) * 0.25 - 0.125) < dz_march[i, 0] <= ((n + 1) * 0.25 + 0.125):
                     n_which_v = n
                     break
@@ -86,47 +90,47 @@ class ClassIdentifier(object):
             if n_which_p > -1 and n_which_v > -1:
                 x_box_number[n_which_p, n_which_v] = x_box_number[n_which_p, n_which_v] + 1
 
-        for m in range(p_num):
-            for n in range(v_num):
+        for m in range(power_num):
+            for n in range(velocity_num):
                 x_box_number[m, n] = x_box_number[m, n] - 1
 
         # 在功率方向将网格内散点绝对个数转换为相对百分比,备用
-        p_box_percent = np.zeros([p_num, v_num], dtype=float)
-        p_bin_sum = np.zeros(p_num, dtype=int)
+        p_box_percent = np.zeros([power_num, velocity_num], dtype=float)
+        p_bin_sum = np.zeros(power_num, dtype=int)
 
-        for i in range(p_num):
-            for m in range(v_num):
+        for i in range(power_num):
+            for m in range(velocity_num):
                 p_bin_sum[i] = p_bin_sum[i] + x_box_number[i, m]
 
-            for m in range(v_num):
+            for m in range(velocity_num):
                 if p_bin_sum[i] > 0:
                     p_box_percent[i, m] = x_box_number[i, m] / p_bin_sum[i] * 100
 
         # 在风速方向将网格内散点绝对个数转换为相对百分比,备用
-        v_box_percent = np.zeros([p_num, v_num], dtype=float)
-        v_bin_sum = np.zeros(v_num, dtype=int)
+        v_box_percent = np.zeros([power_num, velocity_num], dtype=float)
+        v_bin_sum = np.zeros(velocity_num, dtype=int)
 
-        for i in range(v_num):
-            for m in range(p_num):
+        for i in range(velocity_num):
+            for m in range(power_num):
                 v_bin_sum[i] = v_bin_sum[i] + x_box_number[m, i]
 
-            for m in range(p_num):
+            for m in range(power_num):
                 if v_bin_sum[i] > 0:
                     v_box_percent[m, i] = x_box_number[m, i] / v_bin_sum[i] * 100
 
         # 以水平功率带方向为准,分析每个水平功率带中,功率主带中心,即找百分比最大的网格位置。
-        p_box_max_index = np.zeros(p_num, dtype=int)  # 水平功率带最大网格位置索引
-        p_box_max_p = np.zeros(p_num, dtype=int)  # 水平功率带最大网格百分比
+        p_box_max_index = np.zeros(power_num, dtype=int)  # 水平功率带最大网格位置索引
+        p_box_max_p = np.zeros(power_num, dtype=int)  # 水平功率带最大网格百分比
 
-        for m in range(p_num):
+        for m in range(power_num):
             # 确定每一水平功率带的最大网格位置索引即百分比值
             p_box_max_p[m], p_box_max_index[m] = p_box_percent[m, :].max(), p_box_percent[m, :].argmax()
 
         # 以垂直风速方向为准,分析每个垂直风速带中,功率主带中心,即找百分比最大的网格位置。
-        v_box_max_index = np.zeros(v_num, dtype=int)
-        v_box_max_v = np.zeros(v_num, dtype=int)
+        v_box_max_index = np.zeros(velocity_num, dtype=int)
+        v_box_max_v = np.zeros(velocity_num, dtype=int)
 
-        for m in range(v_num):
+        for m in range(velocity_num):
             [v_box_max_v[m], v_box_max_index[m]] = v_box_percent[:, m].max(), v_box_percent[:, m].argmax()
 
         # 切入风速特殊处理,如果切入风速过于偏右,向左拉回
@@ -134,21 +138,21 @@ class ClassIdentifier(object):
             p_box_max_index[0] = 9
 
         # 以水平功率带方向为基准,进行分析
-        dot_dense = np.zeros(p_num, dtype=int)  # 每一水平功率带的功率主带包含的网格数
-        dot_dense_left_right = np.zeros([p_num, 2], dtype=int)  # 存储每一水平功率带的功率主带以最大网格为中心,向向左,向右扩展的网格数
+        dot_dense = np.zeros(power_num, dtype=int)  # 每一水平功率带的功率主带包含的网格数
+        dot_dense_left_right = np.zeros([power_num, 2], dtype=int)  # 存储每一水平功率带的功率主带以最大网格为中心,向向左,向右扩展的网格数
         dot_valve = 90  # 从中心向左右对称扩展网格的散点百分比和的阈值。
 
-        for i in range(p_num - 6):  # 从最下层水平功率带1开始,向上到第PNum-6个水平功率带(额定功率一下水平功率带),逐一分析
+        for i in range(power_num - 6):  # 从最下层水平功率带1开始,向上到第PNum-6个水平功率带(额定功率一下水平功率带),逐一分析
             p_dot_dense_sum = p_box_max_p[i]  # 以中心最大水平功率带为基准,向左向右对称扩展网格,累加各网格散点百分比
             i_spread_right = 1
             i_spread_left = 1
             while p_dot_dense_sum < dot_valve:
 
-                if (p_box_max_index[i] + i_spread_right) < v_num - 1:
+                if (p_box_max_index[i] + i_spread_right) < velocity_num - 1:
                     p_dot_dense_sum = p_dot_dense_sum + p_box_percent[i, p_box_max_index[i] + i_spread_right]  # 向右侧扩展
                     i_spread_right = i_spread_right + 1
 
-                if (p_box_max_index[i] + i_spread_right) > v_num - 1:
+                if (p_box_max_index[i] + i_spread_right) > velocity_num - 1:
                     break
 
                 if (p_box_max_index[i] - i_spread_left) > 0:
@@ -167,14 +171,14 @@ class ClassIdentifier(object):
             dot_dense[i] = i_spread_left + i_spread_right + 1
 
         # 各行功率主带右侧宽度的中位数最具有代表性
-        dot_dense_width_left = np.zeros([p_num - 6, 1], dtype=int)
-        for i in range(p_num - 6):
+        dot_dense_width_left = np.zeros([power_num - 6, 1], dtype=int)
+        for i in range(power_num - 6):
             dot_dense_width_left[i] = dot_dense_left_right[i, 1]
 
         main_band_right = np.median(dot_dense_width_left)
 
         # 散点向右显著延展分布的水平功率带为限功率水平带
-        power_limit = np.zeros([p_num, 1], dtype=int)  # 各水平功率带是否为限功率标识,==1:是;==0:不是
+        power_limit = np.zeros([power_num, 1], dtype=int)  # 各水平功率带是否为限功率标识,==1:是;==0:不是
         width_average = 0  # 功率主带平均宽度
         width_var = 0  # 功率主带方差
         # power_limit_valve = 6    #限功率主带判别阈值
@@ -183,7 +187,7 @@ class ClassIdentifier(object):
         n_counter_limit = 0
         n_counter = 0
 
-        for i in range(p_num - 6):
+        for i in range(power_num - 6):
             if dot_dense_left_right[i, 1] > power_limit_valve and p_bin_sum[i] > 20:  # 如果向右扩展网格数大于阈值,且该水平功率带点总数>20,是
                 power_limit[i] = 1
                 n_counter_limit = n_counter_limit + 1
@@ -195,21 +199,21 @@ class ClassIdentifier(object):
         width_average = width_average / n_counter  # 功率主带平均宽度
 
         # 各水平功率带的功率主带宽度的方差,反映从下到上宽度是否一致,或是否下宽上窄等异常情况
-        for i in range(p_num - 6):
+        for i in range(power_num - 6):
             if dot_dense_left_right[i, 1] <= power_limit_valve:
                 width_var = width_var + (dot_dense_left_right[i, 1] - width_average) * (
                         dot_dense_left_right[i, 1] - width_average)
 
         # 对限负荷水平功率带的最大网格较下面相邻层显著偏右,拉回
-        for i in range(1, p_num - 6):
+        for i in range(1, power_num - 6):
             if power_limit[i] == 1 and abs(p_box_max_index[i] - p_box_max_index[i - 1]) > 5:
                 p_box_max_index[i] = p_box_max_index[i - 1] + 1
 
         # 输出各层功率主带的左右边界网格索引
-        dot_dense_inverse = np.zeros([p_num, 2], dtype=int)
+        dot_dense_inverse = np.zeros([power_num, 2], dtype=int)
 
-        for i in range(p_num):
-            dot_dense_inverse[i, :] = dot_dense_left_right[p_num - i - 1, :]
+        for i in range(power_num):
+            dot_dense_inverse[i, :] = dot_dense_left_right[power_num - i - 1, :]
 
         # 功率主带的右边界
         curve_width_r = int(np.ceil(width_average) + 2)
@@ -217,15 +221,15 @@ class ClassIdentifier(object):
         # curve_width_l = 6    #功率主带的左边界
         curve_width_l = curve_width_r
 
-        b_box_limit = np.zeros([p_num, v_num], dtype=int)  # 网格是否为限功率网格的标识,如果为限功率水平功率带,从功率主带右侧边缘向右的网格为限功率网格
-        for i in range(2, p_num - 6):
+        b_box_limit = np.zeros([power_num, velocity_num], dtype=int)  # 网格是否为限功率网格的标识,如果为限功率水平功率带,从功率主带右侧边缘向右的网格为限功率网格
+        for i in range(2, power_num - 6):
             if power_limit[i] == 1:
-                for j in range(p_box_max_index[i] + curve_width_r, v_num):
+                for j in range(p_box_max_index[i] + curve_width_r, velocity_num):
                     b_box_limit[i, j] = 1
 
-        b_box_remove = np.zeros([p_num, v_num], dtype=int)  # 数据异常需要剔除的网格标识,标识==1:功率主带右侧的欠发网格;==2:功率主带左侧的超发网格
-        for m in range(p_num - 6):
-            for n in range(p_box_max_index[m] + curve_width_r, v_num):
+        b_box_remove = np.zeros([power_num, velocity_num], dtype=int)  # 数据异常需要剔除的网格标识,标识==1:功率主带右侧的欠发网格;==2:功率主带左侧的超发网格
+        for m in range(power_num - 6):
+            for n in range(p_box_max_index[m] + curve_width_r, velocity_num):
                 b_box_remove[m, n] = 1
 
             for n in range(p_box_max_index[m] - curve_width_l, -1, -1):
@@ -235,8 +239,8 @@ class ClassIdentifier(object):
         curve_top = np.zeros(2, dtype=int)
         curve_top_valve = 3  # 网格的百分比阈值
         b_top_find = 0
-        for m in range(p_num - 4 - 1, -1, -1):
-            for n in range(v_num):
+        for m in range(power_num - 4 - 1, -1, -1):
+            for n in range(velocity_num):
                 if v_box_percent[m, n] > curve_top_valve and x_box_number[m, n] >= 10:  # 如左上角网格的百分比和散点个数大于阈值。
                     curve_top[0] = m
                     curve_top[1] = n
@@ -247,19 +251,19 @@ class ClassIdentifier(object):
                 break
 
         isolate_valve = 3
-        for m in range(p_num - 6):
-            for n in range(p_box_max_index[m] + curve_width_r, v_num):
+        for m in range(power_num - 6):
+            for n in range(p_box_max_index[m] + curve_width_r, velocity_num):
                 if p_box_percent[m, n] < isolate_valve:
                     b_box_remove[m, n] = 1
 
         # 功率主带顶部宽度
         curve_width_t = 2
-        for m in range(p_num - curve_width_t - 1, p_num):
-            for n in range(v_num):
+        for m in range(power_num - curve_width_t - 1, power_num):
+            for n in range(velocity_num):
                 b_box_remove[m, n] = 3  # 网格为额定功率以上的超发点
 
         # 功率主带拐点左侧的欠发网格标识
-        for m in range(p_num - 5 - 1, p_num):
+        for m in range(power_num - 5 - 1, power_num):
             for n in range(curve_top[1] - 1):
                 b_box_remove[m, n] = 2
 
@@ -270,12 +274,12 @@ class ClassIdentifier(object):
         n_bad_a = 0
 
         for i in range(n_counter1):
-            for m in range(p_num):
+            for m in range(power_num):
                 if m * 25 < dz_march[i, 1] <= (m + 1) * 25:
                     n_which_p = m
                     break
 
-            for n in range(v_num):
+            for n in range(velocity_num):
                 if ((n + 1) * 0.25 - 0.125) < dz_march[i, 0] <= ((n + 1) * 0.25 + 0.125):
                     n_which_v = n
                     break
@@ -322,7 +326,7 @@ class ClassIdentifier(object):
                 for j in range(n_window_length):
                     dzwind_and_power_sel[i * n_window_length + j] = 4  # 标识窗口内的数据为限负荷数据
 
-        for i in range(p_num - 6):
+        for i in range(power_num - 6):
             pv_left_down = np.zeros(2, dtype=float)
             pv_right_up = np.zeros(2, dtype=float)
 
@@ -341,64 +345,41 @@ class ClassIdentifier(object):
                                 pv_right_up[0] - pv_left_down[0]):  # 斜率大于对角连线,则在锯齿左上三角形中,选中
                             dzwind_and_power_sel[m] = 0
 
-        wind_and_power_df.loc[:, 'lab'] = -1
-        wind_and_power_df.loc[
-            wind_and_power_df[wind_and_power_df[self.active_power] > 0].index, 'lab'] = dzwind_and_power_sel
+        self.df.loc[:, 'lab'] = -1
+        self.df.loc[
+            self.df[self.df[self.active_power] > 0].index, 'lab'] = dzwind_and_power_sel
 
         # 把部分欠发的优化为限电
         # 构建条件表达式
-        cond1 = (wind_and_power_df['lab'] == 1) & (
-                (wind_and_power_df[self.active_power] < self.rated_power * 0.75) &
-                (wind_and_power_df[self.pitch_angle_blade] > 0.5)
+        cond1 = (self.df['lab'] == 1) & (
+                (self.df[self.active_power] < self.rated_power * 0.75) &
+                (self.df[self.pitch_angle_blade] > 0.5)
         )
-        cond2 = (wind_and_power_df['lab'] == 1) & (
-                (wind_and_power_df[self.active_power] < self.rated_power * 0.85) &
-                (wind_and_power_df[self.pitch_angle_blade] > 1.5)
+        cond2 = (self.df['lab'] == 1) & (
+                (self.df[self.active_power] < self.rated_power * 0.85) &
+                (self.df[self.pitch_angle_blade] > 1.5)
         )
-        cond3 = (wind_and_power_df['lab'] == 1) & (
-                (wind_and_power_df[self.active_power] < self.rated_power * 0.9) &
-                (wind_and_power_df[self.pitch_angle_blade] > 2.5)
+        cond3 = (self.df['lab'] == 1) & (
+                (self.df[self.active_power] < self.rated_power * 0.9) &
+                (self.df[self.pitch_angle_blade] > 2.5)
         )
 
         # 使用逻辑或操作符|合并条件
         combined_condition = cond1 | cond2 | cond3
-        wind_and_power_df.loc[combined_condition, 'lab'] = 4
+        self.df.loc[combined_condition, 'lab'] = 4
 
-        wind_and_power_df.reset_index(drop=True, inplace=True)
-        if 'index' in wind_and_power_df.columns:
-            del wind_and_power_df['index']
-        return wind_and_power_df
+        self.df.loc[self.df[self.active_power] <= 0, 'lab'] = -1
+
+        self.df.reset_index(drop=True, inplace=True)
+        if 'index' in self.df.columns:
+            del self.df['index']
+        return self.df
 
     def run(self):
         # Implement your class identification logic here
-        return self.identifier()
-
-
-if __name__ == '__main__':
-    read_dir = r"D:\data\清理数据\和风元宝山\WOF035200003-WOB000005111_MM14号机组0719\minute"
-
-    files = [read_dir + os.sep + i for i in os.listdir(read_dir)]
-
-    for file in files:
-        # test = ClassIdentifier(file_path=file,
-        #                        wind_velocity='wind_velocity',
-        #                        active_power='active_power',
-        #                        pitch_angle_blade='pitch_angle_blade_1',
-        #                        rated_power=1500
-        #                        )
-        #
-        # df = test.run()
-
-        name = os.path.basename(file).split('.')[0]
-        df = read_file_to_df(file)
-
-        color_map = {-1: 'red', 0: 'green', 1: 'blue', 2: 'black', 3: 'orange', 4: 'magenta'}
-        c = df['lab'].map(color_map)
-
-        # -1:停机 0:好点  1:欠发功率点;2:超发功率点;3:额定风速以上的超发功率点 4: 限电
-        legend_map = {"停机": 'red', "好点": 'green', "欠发": 'blue', "超发": 'black', "额定风速以上的超发": 'orange', "限电": 'magenta'}
-        scatter(name, x_label='风速', y_label='有功功率', x_values=df['wind_velocity'].values,
-                y_values=df['active_power'].values, color=c, col_map=legend_map,
-                save_file_path=os.path.dirname(
-                    os.path.dirname(
-                        os.path.dirname(__file__))) + os.sep + "tmp_file" + os.sep + "和风元宝山" + os.sep + name + '结果.png')
+        print_memory_usage(self.wind_turbine_number + "开始打标签")
+        begin = datetime.datetime.now()
+        df = self.identifier()
+        trans_print("打标签结束,", df.shape, ",耗时:", datetime.datetime.now() - begin)
+        print_memory_usage(self.wind_turbine_number + "打标签结束,")
+        return df

+ 183 - 61
etl/step/ReadAndSaveTmp.py

@@ -11,7 +11,7 @@ from service.plt_service import update_trans_transfer_progress
 from utils.file.trans_methods import read_excel_files, split_array, del_blank, \
     create_file_path, read_file_to_df
 from utils.log.trans_log import trans_print
-from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
+from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, get_dir_size, max_file_size_get_max_cpu_count
 
 
 class ReadAndSaveTmp(object):
@@ -21,6 +21,7 @@ class ReadAndSaveTmp(object):
         self.trans_param = trans_param
         self.exist_wind_names = multiprocessing.Manager().list()
         self.lock = multiprocessing.Manager().Lock()
+        self.file_lock = multiprocessing.Manager().dict()
 
     def _save_to_tmp_csv_by_name(self, df, name):
         save_name = str(name) + '.csv'
@@ -34,11 +35,35 @@ class ReadAndSaveTmp(object):
                 contains_name = False
                 self.exist_wind_names.append(name)
 
-        if contains_name:
-            df.to_csv(save_path, index=False, encoding='utf8', mode='a',
-                      header=False)
-        else:
-            df.to_csv(save_path, index=False, encoding='utf8')
+            if contains_name:
+                df.to_csv(save_path, index=False, encoding='utf8', mode='a',
+                          header=False)
+            else:
+                df.to_csv(save_path, index=False, encoding='utf8')
+
+    def save_merge_data(self, file_path):
+        df = self.read_excel_to_df(file_path)
+        names = set(df['wind_turbine_number'].values)
+        cols = list(df.columns)
+        cols.sort()
+        csv_name = "-".join(cols) + ".csv"
+        for name in names:
+            exist_name = name + '-' + csv_name
+            merge_path = self.pathsAndTable.get_merge_tmp_path(name)
+            create_file_path(merge_path)
+
+            with self.lock:
+                if exist_name in self.exist_wind_names:
+                    contains_name = True
+                else:
+                    contains_name = False
+                    self.exist_wind_names.append(exist_name)
+                save_path = os.path.join(merge_path, csv_name)
+                if contains_name:
+                    df.to_csv(save_path, index=False, encoding='utf-8', mode='a',
+                              header=False)
+                else:
+                    df.to_csv(save_path, index=False, encoding='utf-8')
 
     def df_save_to_tmp_file(self, df=pd.DataFrame()):
 
@@ -76,10 +101,13 @@ class ReadAndSaveTmp(object):
 
         df = del_blank(df, ['wind_turbine_number'])
         df = df[df['time_stamp'].isna() == False]
-        if self.trans_param.wind_name_exec:
+        if self.trans_param.wind_name_exec and not self.trans_param.merge_columns:
             exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
             df['wind_turbine_number'] = eval(exec_str)
 
+        # 删除 有功功率 和 风速均为空的情况
+        df.dropna(subset=['active_power', 'wind_velocity'], how='all', inplace=True)
+
         self.save_to_tmp_csv(df)
 
     def save_to_tmp_csv(self, df):
@@ -87,51 +115,65 @@ class ReadAndSaveTmp(object):
         if names:
             trans_print("开始保存", str(names), "到临时文件")
 
-            with multiprocessing.Pool(self.pathsAndTable.multi_pool_count) as pool:
-                pool.starmap(self._save_to_tmp_csv_by_name,
-                             [(df[df['wind_turbine_number'] == name], name) for name in names])
+            for name in names:
+                self._save_to_tmp_csv_by_name(df[df['wind_turbine_number'] == name], name)
             del df
             trans_print("保存", str(names), "到临时文件成功, 风机数量", len(names))
 
+    def merge_df(self, dir_path):
+        all_files = read_excel_files(dir_path)
+        df = pd.DataFrame()
+        for file in all_files:
+            now_df = read_file_to_df(file)
+            now_df.dropna(subset=['time_stamp'], inplace=True)
+            now_df.drop_duplicates(subset=['time_stamp'], inplace=True)
+            now_df.set_index(keys=['time_stamp', 'wind_turbine_number'], inplace=True)
+            df = pd.concat([df, now_df], axis=1)
+        df.reset_index(inplace=True)
+        self.df_save_to_tmp_file(df)
+
+        return df
+
     def read_file_and_save_tmp(self):
         all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
+        split_count = use_files_get_max_cpu_count(all_files)
+        all_arrays = split_array(all_files, split_count)
+
         if self.trans_param.merge_columns:
-            dfs_list = list()
-            index_keys = [self.trans_param.cols_tran['time_stamp']]
-            wind_col = self.trans_param.cols_tran['wind_turbine_number']
-            if str(wind_col).startswith("$"):
-                wind_col = 'wind_turbine_number'
-            index_keys.append(wind_col)
-            df_map = dict()
-            # todo 需要优化
-            with multiprocessing.Pool(self.pathsAndTable.multi_pool_count) as pool:
-                dfs = pool.starmap(self.read_excel_to_df, [(file,) for file in all_files])
-
-            for df in dfs:
-                key = '-'.join(df.columns)
-                if key in df_map.keys():
-                    df_map[key] = pd.concat([df_map[key], df])
-                else:
-                    df_map[key] = df
-
-            for k, df in df_map.items():
-                df.drop_duplicates(inplace=True)
-                df.set_index(keys=index_keys, inplace=True)
-                df = df[~df.index.duplicated(keep='first')]
-                dfs_list.append(df)
-
-            df = pd.concat(dfs_list, axis=1)
-            df.reset_index(inplace=True)
-            try:
-                self.df_save_to_tmp_file(df)
-            except Exception as e:
-                trans_print(traceback.format_exc())
-                message = "合并列出现错误:" + str(e)
-                raise ValueError(message)
+            for index, arr in enumerate(all_arrays):
+                try:
+                    with multiprocessing.Pool(split_count) as pool:
+                        pool.starmap(self.save_merge_data, [(ar,) for ar in arr])
+
+                except Exception as e:
+                    trans_print(traceback.format_exc())
+                    message = "整理临时文件,系统返回错误:" + str(e)
+                    raise ValueError(message)
+
+                update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
+                                               round(20 + 20 * (index + 1) / len(all_arrays), 2),
+                                               self.pathsAndTable.save_db)
+
+            dirs = [os.path.join(self.pathsAndTable.get_merge_tmp_path(), dir_name) for dir_name in
+                    os.listdir(self.pathsAndTable.get_merge_tmp_path())]
+            dir_total_size = get_dir_size(dirs[0])
+            split_count = max_file_size_get_max_cpu_count(dir_total_size)
+            all_arrays = split_array(dirs, split_count)
+            for index, arr in enumerate(all_arrays):
+                try:
+                    with multiprocessing.Pool(split_count) as pool:
+                        pool.starmap(self.merge_df, [(ar,) for ar in arr])
+
+                except Exception as e:
+                    trans_print(traceback.format_exc())
+                    message = "整理临时文件,系统返回错误:" + str(e)
+                    raise ValueError(message)
+
+                update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
+                                               round(20 + 30 * (index + 1) / len(all_arrays), 2),
+                                               self.pathsAndTable.save_db)
 
         else:
-            split_count = use_files_get_max_cpu_count(all_files)
-            all_arrays = split_array(all_files, split_count)
 
             for index, arr in enumerate(all_arrays):
                 try:
@@ -159,7 +201,8 @@ class ReadAndSaveTmp(object):
 
         if self.trans_param.is_vertical_table:
             vertical_cols = self.trans_param.vertical_cols
-            df = read_file_to_df(file_path, vertical_cols, header=self.trans_param.header)
+            df = read_file_to_df(file_path, vertical_cols, header=self.trans_param.header,
+                                 trans_cols=self.trans_param.vertical_cols)
             df = df[df[self.trans_param.vertical_key].isin(read_cols)]
             df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number',
                                self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True)
@@ -170,17 +213,27 @@ class ReadAndSaveTmp(object):
 
         else:
             trans_dict = dict()
+            trans_cols = []
             for k, v in self.trans_param.cols_tran.items():
                 if v and v.startswith("$") or v.find(",") > 0:
                     trans_dict[v] = k
 
+                if v.find("|") > -1:
+                    vs = v.split("|")
+                    trans_cols.extend(vs)
+                else:
+                    trans_cols.append(v)
+            trans_cols = list(set(trans_cols))
             if self.trans_param.merge_columns:
-                df = read_file_to_df(file_path, header=self.trans_param.header)
+                df = read_file_to_df(file_path, header=self.trans_param.header,
+                                     trans_cols=trans_cols)
             else:
                 if self.trans_param.need_valid_cols:
-                    df = read_file_to_df(file_path, read_cols, header=self.trans_param.header)
+                    df = read_file_to_df(file_path, read_cols, header=self.trans_param.header,
+                                         trans_cols=trans_cols)
                 else:
-                    df = read_file_to_df(file_path, header=self.trans_param.header)
+                    df = read_file_to_df(file_path, header=self.trans_param.header,
+                                         trans_cols=trans_cols)
 
             # 处理列名前缀问题
             if self.trans_param.resolve_col_prefix:
@@ -189,21 +242,65 @@ class ReadAndSaveTmp(object):
                     columns_dict[column] = eval(self.trans_param.resolve_col_prefix)
                 df.rename(columns=columns_dict, inplace=True)
 
+            if self.trans_param.merge_columns:
+                select_cols = [self.trans_param.cols_tran['wind_turbine_number'],
+                               self.trans_param.cols_tran['time_stamp'],
+                               'wind_turbine_number', 'time_stamp']
+                select_cols.extend(trans_cols)
+
+                rename_dict = dict()
+                wind_turbine_number_col = self.trans_param.cols_tran['wind_turbine_number']
+                if wind_turbine_number_col.find("|") > -1:
+                    cols = wind_turbine_number_col.split("|")
+                    for col in cols:
+                        rename_dict[col] = 'wind_turbine_number'
+
+                time_stamp_col = self.trans_param.cols_tran['time_stamp']
+                if time_stamp_col.find("|") > -1:
+                    cols = time_stamp_col.split("|")
+                    for col in cols:
+                        rename_dict[col] = 'time_stamp'
+
+                df.rename(columns=rename_dict, inplace=True)
+
+                for col in df.columns:
+                    if col not in select_cols:
+                        del df[col]
+
             for k, v in trans_dict.items():
                 if k.startswith("$file"):
                     file = ".".join(os.path.basename(file_path).split(".")[0:-1])
                     if k == "$file":
-                        df[v] = str(file)
+                        ks = k.split("|")
+                        bool_contains = False
+                        for k_data in ks:
+                            if k_data in df.columns or v in df.columns:
+                                bool_contains = True
+                        if not bool_contains:
+                            df[v] = str(file)
+
                     elif k.startswith("$file["):
-                        datas = str(k.replace("$file", "").replace("[", "").replace("]", "")).split(":")
-                        if len(datas) != 2:
-                            raise Exception("字段映射出现错误 :" + str(trans_dict))
-                        df[v] = str(file[int(datas[0]):int(datas[1])]).strip()
+                        ks = k.split("|")
+                        bool_contains = False
+                        for k_data in ks:
+                            if k_data in df.columns or v in df.columns:
+                                bool_contains = True
+                        if not bool_contains:
+                            datas = str(ks[0].replace("$file", "").replace("[", "").replace("]", "")).split(":")
+                            if len(datas) != 2:
+                                raise Exception("字段映射出现错误 :" + str(trans_dict))
+                            df[v] = str(file[int(datas[0]):int(datas[1])]).strip()
                     elif k.startswith("$file.split"):
-                        datas = str(k).replace("$file.split(", "").replace(")", "").split(",")
-                        split_str = str(datas[0])
-                        split_index = int(datas[1])
-                        df[v] = str(file.split(split_str)[split_index])
+                        ks = k.split("|")
+                        bool_contains = False
+                        for k_data in ks:
+                            if k_data in df.columns or v in df.columns:
+                                bool_contains = True
+                        if not bool_contains:
+                            datas = str(ks[0]).replace("$file.split(", "").replace(")", "").split(",")
+                            split_str = str(datas[0])
+                            split_index = int(datas[1])
+                            df[v] = str(file.split(split_str)[split_index])
 
                 elif k.find("$file_date") > 0:
                     datas = str(k.split(",")[1].replace("$file_date", "").replace("[", "").replace("]", "")).split(":")
@@ -215,13 +312,38 @@ class ReadAndSaveTmp(object):
 
                 elif k.startswith("$folder"):
                     folder = file_path
-                    cengshu = int(str(k.replace("$folder", "").replace("[", "").replace("]", "")))
-                    for i in range(cengshu):
-                        folder = os.path.dirname(folder)
-                    df[v] = str(str(folder).split(os.sep)[-1]).strip()
+                    ks = k.split("|")
+                    bool_contains = False
+                    for k_data in ks:
+                        if k_data in df.columns or v in df.columns:
+                            bool_contains = True
+                    if not bool_contains:
+                        cengshu = int(str(ks[0].replace("$folder", "").replace("[", "").replace("]", "")))
+                        for i in range(cengshu):
+                            folder = os.path.dirname(folder)
+                        df[v] = str(str(folder).split(os.sep)[-1]).strip()
                 elif k.startswith("$sheet_name"):
                     df[v] = df['sheet_name']
 
+            if 'time_stamp' not in df.columns:
+                cols_trans = [i for i in self.trans_param.cols_tran['time_stamp'].split('|')]
+                cols_dict = dict()
+                for col in cols_trans:
+                    cols_dict[col] = 'time_stamp'
+                df.rename(columns=cols_dict, inplace=True)
+
+            if 'wind_turbine_number' not in df.columns:
+                cols_trans = [i for i in self.trans_param.cols_tran['wind_turbine_number'].split('|')]
+                cols_dict = dict()
+                for col in cols_trans:
+                    cols_dict[col] = 'wind_turbine_number'
+
+                df.rename(columns=cols_dict, inplace=True)
+
+            if self.trans_param.wind_name_exec:
+                exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
+                df['wind_turbine_number'] = eval(exec_str)
+
             return df
 
     def run(self):

+ 51 - 27
etl/step/StatisticsAndSaveFile.py

@@ -14,17 +14,18 @@ from utils.conf.read_conf import read_conf
 from utils.df_utils.util import get_time_space
 from utils.file.trans_methods import create_file_path, read_excel_files, read_file_to_df, split_array
 from utils.log.trans_log import trans_print
-from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
+from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, print_memory_usage
 
 
 class StatisticsAndSaveFile(object):
 
-    def __init__(self, paths_and_table: PathsAndTable, trans_param: TransParam, statistics_map, rated_power_map):
+    def __init__(self, paths_and_table: PathsAndTable, trans_param: TransParam, statistics_map,
+                 rated_power_and_cutout_speed_map):
         self.paths_and_table = paths_and_table
         self.trans_param = trans_param
         self.statistics_map = statistics_map
         self.lock = multiprocessing.Manager().Lock()
-        self.rated_power_map = rated_power_map
+        self.rated_power_and_cutout_speed_map = rated_power_and_cutout_speed_map
 
     def set_statistics_data(self, df):
 
@@ -68,8 +69,8 @@ class StatisticsAndSaveFile(object):
         pass
 
     def save_to_csv(self, filename):
+        print_memory_usage("开始读取csv:" + os.path.basename(filename))
         df = read_file_to_df(filename)
-
         if self.trans_param.is_vertical_table:
             df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
                                 values=self.trans_param.vertical_value,
@@ -77,11 +78,7 @@ class StatisticsAndSaveFile(object):
             # 重置索引以得到普通的列
             df.reset_index(inplace=True)
 
-        for k in self.trans_param.cols_tran.keys():
-            if k not in df.columns:
-                df[k] = None
-
-        df = df[self.trans_param.cols_tran.keys()]
+        print_memory_usage("结束读取csv,:" + os.path.basename(filename))
 
         # 转化风机名称
         trans_print("开始转化风机名称")
@@ -89,42 +86,66 @@ class StatisticsAndSaveFile(object):
         df['wind_turbine_name'] = df['wind_turbine_number']
         df['wind_turbine_number'] = df['wind_turbine_number'].map(
             self.trans_param.wind_col_trans).fillna(df['wind_turbine_number'])
-
         wind_col_name = str(df['wind_turbine_number'].values[0])
-        # 添加年月日
-        trans_print(wind_col_name, "包含时间字段,开始处理时间字段,添加年月日", filename)
-        trans_print(wind_col_name, "时间原始大小:", df.shape[0])
-        # df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
-        # trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
-        df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
-        df.dropna(subset=['time_stamp'], inplace=True)
-        trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
-        df['year'] = df['time_stamp'].dt.year
-        df['month'] = df['time_stamp'].dt.month
-        df['day'] = df['time_stamp'].dt.day
-        df.sort_values(by='time_stamp', inplace=True)
-        df['time_stamp'] = df['time_stamp'].apply(
-            lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
-        trans_print("处理时间字段结束")
+        print_memory_usage("转化风机名称结束:" + wind_col_name)
 
         not_double_cols = ['wind_turbine_number', 'wind_turbine_name', 'time_stamp', 'param6', 'param7', 'param8',
                            'param9', 'param10']
+
+        solve_time_begin = datetime.datetime.now()
         trans_print(wind_col_name, "去掉非法数据前大小:", df.shape[0])
         df.replace(np.nan, -999999999, inplace=True)
+        number_cols = df.select_dtypes(include=['number']).columns.tolist()
         for col in df.columns:
-            if col not in not_double_cols:
+            if col not in not_double_cols and col not in number_cols:
                 if not df[col].isnull().all():
                     df[col] = pd.to_numeric(df[col], errors='coerce')
                     # 删除包含NaN的行(即那些列A转换失败的行)
                     df = df.dropna(subset=[col])
         trans_print(wind_col_name, "去掉非法数据后大小:", df.shape[0])
         df.replace(-999999999, np.nan, inplace=True)
+        print_memory_usage("处理非法数据大小结束:" + wind_col_name)
+
         trans_print(wind_col_name, "去掉重复数据前大小:", df.shape[0])
         df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
         trans_print(wind_col_name, "去掉重复数据后大小:", df.shape[0])
+        trans_print("处理非法重复数据结束,耗时:", datetime.datetime.now() - solve_time_begin)
+        print_memory_usage("处理重复数据结束:" + wind_col_name)
+
+        # 添加年月日
+        solve_time_begin = datetime.datetime.now()
+        trans_print(wind_col_name, "包含时间字段,开始处理时间字段,添加年月日", filename)
+        trans_print(wind_col_name, "时间原始大小:", df.shape[0])
+        # df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
+        # trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
+        df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
+        df.dropna(subset=['time_stamp'], inplace=True)
+        trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
+        df.sort_values(by='time_stamp', inplace=True)
+        trans_print("处理时间字段结束,耗时:", datetime.datetime.now() - solve_time_begin)
+        print_memory_usage("处理时间结果:" + wind_col_name)
+
+        df = df[[i for i in self.trans_param.cols_tran.keys() if i in df.columns]]
+        print_memory_usage("删减无用字段后内存占用:" + wind_col_name)
 
-        class_identifiler = ClassIdentifier(origin_df=df, rated_power=read_conf(self.rated_power_map, str(wind_col_name)))
+
+        rated_power_and_cutout_speed_tuple = read_conf(self.rated_power_and_cutout_speed_map, str(wind_col_name))
+        if rated_power_and_cutout_speed_tuple is None:
+            rated_power_and_cutout_speed_tuple = (None, None)
+
+        print_memory_usage("打标签前内存占用:" + wind_col_name)
+        class_identifiler = ClassIdentifier(wind_turbine_number=wind_col_name, origin_df=df,
+                                            rated_power=rated_power_and_cutout_speed_tuple[0],
+                                            cut_out_speed=rated_power_and_cutout_speed_tuple[1])
         df = class_identifiler.run()
+        print_memory_usage("打标签后内存占用:" + wind_col_name)
+
+        df['year'] = df['time_stamp'].dt.year
+        df['month'] = df['time_stamp'].dt.month
+        df['day'] = df['time_stamp'].dt.day
+        df['time_stamp'] = df['time_stamp'].apply(
+            lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
+        print_memory_usage("添加年月日后:" + wind_col_name)
 
         if self.paths_and_table.save_zip:
             save_path = os.path.join(self.paths_and_table.get_save_path(), str(wind_col_name) + '.csv.gz')
@@ -142,12 +163,14 @@ class StatisticsAndSaveFile(object):
         trans_print("保存" + str(wind_col_name) + "成功")
 
     def mutiprocessing_to_save_file(self):
+        print_memory_usage("开始执行,占用内存")
         # 开始保存到正式文件
         trans_print("开始保存到excel文件")
         all_tmp_files = read_excel_files(self.paths_and_table.get_read_tmp_path())
         # split_count = self.pathsAndTable.multi_pool_count
         split_count = use_files_get_max_cpu_count(all_tmp_files)
         all_arrays = split_array(all_tmp_files, split_count)
+
         try:
             for index, arr in enumerate(all_arrays):
                 with multiprocessing.Pool(split_count) as pool:
@@ -156,6 +179,7 @@ class StatisticsAndSaveFile(object):
                                                round(50 + 20 * (index + 1) / len(all_arrays), 2),
                                                self.paths_and_table.save_db)
 
+
         except Exception as e:
             trans_print(traceback.format_exc())
             message = "保存文件错误,系统返回错误:" + str(e)

+ 2 - 2
service/plt_service.py

@@ -99,13 +99,13 @@ def get_exec_data(run_count: int = 1) -> dict:
 
 
 def get_all_wind(field_code):
-    query_sql = "select engine_code,engine_name,rated_capacity from wind_engine_group where field_code = %s and del_state = 0"
+    query_sql = "select engine_code,engine_name,rated_capacity,rated_cut_out_windspeed from wind_engine_group where field_code = %s and del_state = 0"
     dict_datas = plt.execute(query_sql, (field_code,))
     wind_result = dict()
     power_result = dict()
     for data in dict_datas:
         wind_result[str(data['engine_name'])] = str(data['engine_code'])
-        power_result[str(data['engine_code'])] = float(data['rated_capacity'])
+        power_result[str(data['engine_code'])] = (float(data['rated_capacity']),float(data['rated_cut_out_windspeed']))
     return wind_result,power_result
 
 

+ 9 - 4
test_app_run.py

@@ -95,7 +95,8 @@ def __exec_trans(step, end, batch_no, batch_name, transfer_type, transfer_file_a
                             wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
                             vertical_cols=vertical_cols, vertical_key=vertical_key,
                             vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
-                            resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols)
+                            resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols,
+                            header=begin_header)
 
         try:
             trans_subject = WindFarms(batch_no=batch_no, batch_name=batch_name, field_code=field_code,
@@ -137,6 +138,10 @@ if __name__ == '__main__':
 
     # run_schedule(run_count=run_count)
 
-    run_local(3, 3, batch_no='WOF035200003-WOB000005111', batch_name='MM14号机组0719', transfer_type='minute',
-              transfer_file_addr=r'D:\trans_data\和风元宝山\收资数据\min', field_name='和风元宝山',
-              field_code="WOF039800012", save_db=False)
+    run_local(0, 4, batch_no='WOF043600007-WOB000001', batch_name='XALFDC0814', transfer_type='second',
+              transfer_file_addr=r'/data/download/collection_data/1进行中/新艾里风电场-吉林-大唐/收资数据/sec', field_name='新艾里风电场',
+              field_code="WOF043600007", save_db=True)
+
+    # run_local(3, 3, batch_no='WOF043600007-WOB000001', batch_name='XALFDC0814', transfer_type='second',
+    #           transfer_file_addr=r'D:\trans_data\新艾里风电场\收资数据\1号风机', field_name='新艾里风电场',
+    #           field_code="WOF043600007", save_db=False)

+ 58 - 0
tmp_file/fengxiang_fengdianchang.py

@@ -0,0 +1,58 @@
+from multiprocessing import Pool
+
+import sys, os
+
+path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+print(path)
+sys.path.insert(0, path)
+print(sys.path)
+
+from utils.file.trans_methods import *
+from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
+
+
+def read_and_save_file(filename):
+    try:
+        basename = os.path.basename(filename)
+        wind_number = basename.split("_")[0]
+        df = read_file_to_df(filename, header=1)
+        df['风机号'] = wind_number
+        df['描述'] = pd.to_datetime(df['描述'], format='%d-%m-%Y %H:%M:%S')
+        df.set_index(keys=['描述', '风机号'], inplace=True)
+        return wind_number, df
+    except Exception as e:
+        print(basename, 'error')
+        raise e
+
+
+if __name__ == '__main__':
+    read_path = r'/data/download/collection_data/1进行中/枫香风电场-贵州-大唐/收资数据/枫香风电场收资表/1.10分钟SCADA数据'
+    save_path = r'/data/download/collection_data/1进行中/枫香风电场-贵州-大唐/清理数据/枫香风电场收资表/1.10分钟SCADA数据'
+    # read_path = r'D:\trans_data\枫香\收资数据\min'
+    # save_path = r'D:\trans_data\枫香\清理数据\min'
+    create_file_path(save_path, False)
+    all_fils = read_excel_files(read_path)
+    process_count = use_files_get_max_cpu_count(all_fils)
+
+    with Pool(process_count) as pool:
+        results = pool.starmap(read_and_save_file, [(i,) for i in all_fils])
+
+    df_dict = dict()
+    for result in results:
+        wind_number, df = result
+        cols = list(df.columns)
+        cols.sort()
+        cols_str = '-'.join(cols)
+        if wind_number in df_dict.keys():
+            if cols_str in df_dict[wind_number].keys():
+                df_dict[wind_number][cols_str] = pd.concat([df_dict[wind_number][cols_str], df], axis=0)
+            else:
+                df_dict[wind_number][cols_str] = df
+        else:
+            df_dict[wind_number] = {cols_str: df}
+
+    for wind_number, cols_dict in df_dict.items():
+        df = pd.concat(cols_dict.values(), axis=1)
+        df.sort_index(inplace=True)
+        df.reset_index(inplace=True)
+        df.to_csv(os.path.join(save_path, f"{wind_number}.csv"), encoding="utf-8", index=False)

+ 30 - 0
tmp_file/read_and_draw_png.py

@@ -0,0 +1,30 @@
+import multiprocessing
+import os
+
+from utils.draw.draw_file import scatter
+from utils.file.trans_methods import read_file_to_df
+
+
+def draw(file, fengchang='测试'):
+    name = os.path.basename(file).split('.')[0]
+    df = read_file_to_df(file)
+    df.loc[df['active_power'] <= 0, 'lab'] = -1
+
+    color_map = {-1: 'red', 0: 'green', 1: 'blue', 2: 'black', 3: 'orange', 4: 'magenta'}
+    c = df['lab'].map(color_map)
+
+    # -1:停机 0:好点  1:欠发功率点;2:超发功率点;3:额定风速以上的超发功率点 4: 限电
+    legend_map = {"停机": 'red', "好点": 'green', "欠发": 'blue', "超发": 'black', "额定风速以上的超发": 'orange', "限电": 'magenta'}
+    scatter(name, x_label='风速', y_label='有功功率', x_values=df['wind_velocity'].values,
+            y_values=df['active_power'].values, color=c, col_map=legend_map,
+            save_file_path=os.path.dirname(
+                    os.path.dirname(__file__)) + os.sep + "tmp" + os.sep + str(fengchang) + os.sep + name + '结果.png')
+
+
+if __name__ == '__main__':
+    read_dir = r"Z:\collection_data\1进行中\新艾里风电场-吉林-大唐\清理数据\WOF043600007-WOB000001_XALFDC0814\minute"
+
+    files = [read_dir + os.sep + i for i in os.listdir(read_dir)]
+
+    with multiprocessing.Pool(6) as pool:
+        pool.starmap(draw, [(file, "新艾里风电场") for file in files])

+ 1 - 0
utils/draw/draw_file.py

@@ -5,6 +5,7 @@ from utils.file.trans_methods import create_file_path
 matplotlib.use('Agg')
 matplotlib.rcParams['font.family'] = 'SimHei'
 matplotlib.rcParams['font.sans-serif'] = ['SimHei']
+matplotlib.rcParams['axes.unicode_minus'] = False
 from matplotlib import pyplot as plt
 
 

+ 54 - 13
utils/file/trans_methods.py

@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # @Time    : 2024/5/16
 # @Author  : 魏志亮
+import datetime
 import os
 import re
 import shutil
@@ -27,9 +28,10 @@ def detect_file_encoding(filename):
     if encoding is None:
         encoding = 'gb18030'
 
-    if encoding and encoding.lower() == 'gb2312' or encoding.lower().startswith("windows"):
-        encoding = 'gb18030'
-    return encoding
+    if encoding.lower() in ['utf-8', 'ascii', 'utf8']:
+        return 'utf-8'
+
+    return 'gb18030'
 
 
 def del_blank(df=pd.DataFrame(), cols=list()):
@@ -44,9 +46,40 @@ def split_array(array, num):
     return [array[i:i + num] for i in range(0, len(array), num)]
 
 
+def find_read_header(file_path, trans_cols):
+    print(trans_cols)
+    df = read_file_to_df(file_path, nrows=20)
+    count = 0
+    for col in trans_cols:
+        if col in df.columns:
+            count = count + 1
+            if count >= 2:
+                return 0
+
+    count = 0
+
+    for index, row in df.iterrows():
+        for col in trans_cols:
+            if col in row.values:
+                count = count + 1
+                if count > 2:
+                    return index + 1
+
+    return None
+
+
 # 读取数据到df
-def read_file_to_df(file_path, read_cols=list(), header=0):
+def read_file_to_df(file_path, read_cols=list(), header=0, trans_cols=None, nrows=None):
+    begin = datetime.datetime.now()
     trans_print('开始读取文件', file_path)
+    if trans_cols:
+        header = find_read_header(file_path, trans_cols)
+        trans_print(os.path.basename(file_path), "读取第", header, "行")
+        if header is None:
+            message = '未匹配到开始行,请检查并重新指定'
+            trans_print(message)
+            raise Exception(message)
+
     try:
         df = pd.DataFrame()
         if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
@@ -54,16 +87,17 @@ def read_file_to_df(file_path, read_cols=list(), header=0):
             end_with_gz = str(file_path).lower().endswith("gz")
             if read_cols:
                 if end_with_gz:
-                    df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header)
+                    df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header,
+                                     nrows=nrows)
                 else:
                     df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header,
-                                     on_bad_lines='warn')
+                                     on_bad_lines='warn', nrows=nrows)
             else:
 
                 if end_with_gz:
-                    df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header)
+                    df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header, nrows=nrows)
                 else:
-                    df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn')
+                    df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn', nrows=nrows)
 
         else:
             xls = pd.ExcelFile(file_path)
@@ -71,15 +105,15 @@ def read_file_to_df(file_path, read_cols=list(), header=0):
             sheet_names = xls.sheet_names
             for sheet_name in sheet_names:
                 if read_cols:
-                    now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, usecols=read_cols)
+                    now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, usecols=read_cols, nrows=nrows)
                 else:
-                    now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header)
+                    now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, nrows=nrows)
 
                 now_df['sheet_name'] = sheet_name
 
                 df = pd.concat([df, now_df])
 
-        trans_print('文件读取成功', file_path, '文件数量', df.shape)
+        trans_print('文件读取成功', file_path, '文件数量', df.shape, '耗时', datetime.datetime.now() - begin)
     except Exception as e:
         trans_print('读取文件出错', file_path, str(e))
         message = '文件:' + os.path.basename(file_path) + ',' + str(e)
@@ -104,8 +138,6 @@ def __build_directory_dict(directory_dict, path, filter_types=None):
                 if str(item_path).count("~$") == 0:
                     directory_dict[path].append(item_path)
 
-    # 读取所有文件
-
 
 # 读取路径下所有的excel文件
 def read_excel_files(read_path):
@@ -147,3 +179,12 @@ def generate_turbine_name(turbine_name='F0001', prefix='F'):
     strinfo = re.compile(r"[\D*]")
     name = strinfo.sub('', str(turbine_name))
     return prefix + str(int(name)).zfill(3)
+
+
+if __name__ == '__main__':
+    # files = read_excel_files(r'D:\trans_data\10.xls')
+    # for file in files:
+    file = r'D:\trans_data\新艾里风电场10号风机.csv'
+    read_file_to_df(file, trans_cols=
+    ['', '风向', '时间', '设备号', '机舱方向总角度', '$folder[2]', '发电机转速30秒平均值', '机组运行模式', '机舱旋转角度', '主轴转速', '变桨角度30秒平均值', '记录时间',
+     '发电机功率30秒平均值', '风速30秒平均值'])

+ 40 - 1
utils/systeminfo/sysinfo.py

@@ -4,6 +4,26 @@ import psutil
 from utils.log.trans_log import trans_print
 
 
+def print_memory_usage(detail=""):
+    # 获取当前进程ID
+    pid = os.getpid()
+    # 获取进程信息
+    py = psutil.Process(pid)
+    # 获取内存信息
+    memory_info = py.memory_info()
+    # RSS (Resident Set Size) 是进程实际占用的物理内存大小
+    memory_usage_rss = memory_info.rss
+    # VMS (Virtual Memory Size) 是进程使用的虚拟内存大小
+    memory_usage_vms = memory_info.vms
+
+    # 将字节转换为更易读的单位
+    memory_usage_rss_mb = memory_usage_rss / (1024 ** 2)
+    memory_usage_vms_mb = memory_usage_vms / (1024 ** 2)
+
+    trans_print(f"{detail},Memory usage (RSS): {memory_usage_rss_mb:.2f} MB")
+    trans_print(f"{detail},Memory usage (VMS): {memory_usage_vms_mb:.2f} MB")
+
+
 def get_cpu_count():
     return psutil.cpu_count()
 
@@ -17,6 +37,11 @@ def get_file_size(file_path):
     return os.path.getsize(file_path)
 
 
+def get_dir_size(dir_path):
+    return sum(get_file_size(os.path.join(dir_path, file)) for file in os.listdir(dir_path) if
+               os.path.isfile(os.path.join(dir_path, file)))
+
+
 def get_available_memory_with_percent(percent: float = 1):
     memory_info = psutil.virtual_memory()
     return int(memory_info.available * percent)
@@ -31,7 +56,7 @@ def get_max_file_size(file_paths: list[str]):
     return max_size
 
 
-def use_files_get_max_cpu_count(file_paths: list[str], memory_percent: float = 1 / 6, cpu_percent: float = 2 / 5):
+def use_files_get_max_cpu_count(file_paths: list[str], memory_percent: float = 1 / 12, cpu_percent: float = 2 / 5):
     max_file_size = get_max_file_size(file_paths)
     free_memory = get_available_memory_with_percent(memory_percent)
     count = int(free_memory / max_file_size)
@@ -46,6 +71,20 @@ def use_files_get_max_cpu_count(file_paths: list[str], memory_percent: float = 1
     return result
 
 
+def max_file_size_get_max_cpu_count(max_file_size, memory_percent: float = 1 / 6, cpu_percent: float = 2 / 5):
+    free_memory = get_available_memory_with_percent(memory_percent)
+    count = int(free_memory / max_file_size)
+    max_cpu_count = get_available_cpu_count_with_percent(cpu_percent)
+    result = count if count <= max_cpu_count else max_cpu_count
+    if result == 0:
+        result = 1
+    trans_print(",获取最大文件大小:", str(round(max_file_size / 2 ** 20, 2)) + "M",
+                "可用内存:", str(get_available_memory_with_percent(1) / 2 ** 20) + "M",
+                "总CPU数:", get_cpu_count(), "CPU使用比例:", round(cpu_percent, 2), "CPU可用数量:", max_cpu_count,
+                ",最终确定使用进程数:", result)
+    return result
+
+
 if __name__ == '__main__':
     from utils.file.trans_methods import read_files
     import datetime