ソースを参照

添加数据过滤功能

魏志亮 2 日 前
コミット
b37d2f0569

+ 53 - 0
etl/wind_power/min_sec/FilterValidData.py

@@ -0,0 +1,53 @@
+import pandas as pd
+
+
+class FilterValidData:
+    def __init__(self, df, rated_power):
+        self.df = df
+        self.rated_power = rated_power
+
+    def run(self):
+        # 定义所有条件(列名: (是否包含最小值, 最小值, 是否包含最大值, 最大值))
+        final_condition = pd.Series(True, index=self.df.index)
+        conditions = [
+            ('active_power', False, -200, False, self.rated_power * 1.3),  # > -200 且 < rated_power*1.3
+            ('rotor_speed', True, 0, False, 25),  # >=0 且 <25
+            ('generator_speed', True, 0, False, 2500),  # >=0 且 <2500
+            ('wind_velocity', True, 0, False, 80),  # >=0 且 <80
+            ('pitch_angle_blade_1', False, -300, False, 300),  # > -300 且 <300
+            ('pitch_angle_blade_2', False, -300, False, 300),  # > -300 且 <300
+            ('pitch_angle_blade_3', False, -300, False, 300),  # > -300 且 <300
+            ('main_bearing_temperature', False, -50, False, 200),  # > -50 且 <200
+            ('gearbox_oil_temperature', False, -50, False, 200),  # > -50 且 <200
+            ('gearbox_low_speed_shaft_bearing_temperature', False, -50, False, 200),  # > -50 且 <200
+            ('gearboxmedium_speed_shaftbearing_temperature', False, -50, False, 200),  # > -50 且 <200
+            ('gearbox_high_speed_shaft_bearing_temperature', False, -50, False, 200),  # > -50 且 <200
+            ('generatordrive_end_bearing_temperature', False, -50, False, 200),  # > -50 且 <200
+            ('generatornon_drive_end_bearing_temperature', False, -50, False, 200),  # > -50 且 <200
+            ('cabin_temperature', False, -50, False, 200),  # > -50 且 <200
+            ('outside_cabin_temperature', False, -50, False, 200),  # > -50 且 <200
+            ('generator_winding1_temperature', False, -50, False, 200),  # > -50 且 <200
+            ('generator_winding2_temperature', False, -50, False, 200),  # > -50 且 <200
+            ('generator_winding3_temperature', False, -50, False, 200),  # > -50 且 <200
+        ]
+
+        for col, include_min, min_val, include_max, max_val in conditions:
+            if col not in self.df.columns:
+                continue  # 如果列不存在,跳过
+
+            # 处理最小值条件
+            if include_min:
+                min_cond = (self.df[col] >= min_val)
+            else:
+                min_cond = (self.df[col] > min_val)
+
+            # 处理最大值条件
+            if include_max:
+                max_cond = (self.df[col] <= max_val)
+            else:
+                max_cond = (self.df[col] < max_val)
+
+            # 组合条件
+            final_condition &= (min_cond & max_cond)
+
+        return self.df[final_condition]

+ 6 - 0
etl/wind_power/min_sec/StatisticsAndSaveTmpFormalFile.py

@@ -8,6 +8,7 @@ import pandas as pd
 from etl.common.PathsAndTable import PathsAndTable
 from etl.wind_power.min_sec import TransParam
 from etl.wind_power.min_sec.ClassIdentifier import ClassIdentifier
+from etl.wind_power.min_sec.FilterValidData import FilterValidData
 from service.trans_conf_service import update_trans_transfer_progress
 from utils.conf.read_conf import read_conf
 from utils.df_utils.util import get_time_space
@@ -123,6 +124,11 @@ class StatisticsAndSaveTmpFormalFile(object):
         if rated_power_and_cutout_speed_tuple is None:
             rated_power_and_cutout_speed_tuple = (None, None)
 
+        trans_print('过滤数据前数据大小', df.shape)
+        filter_valid_data = FilterValidData(df, rated_power_and_cutout_speed_tuple[0])
+        df = filter_valid_data.run()
+        trans_print('过滤数据后数据大小', df.shape)
+
         # 如果有需要处理的,先进行代码处理,在进行打标签
         # exec_code = get_trans_exec_code(self.paths_and_table.exec_id, self.paths_and_table.read_type)
         # if exec_code:

+ 76 - 0
utils/tmp_util/表添加列.py

@@ -0,0 +1,76 @@
+import os
+import sys
+
+env = 'prod'
+if len(sys.argv) >= 2:
+    env = sys.argv[1]
+
+conf_path = os.path.abspath(__file__).split("energy-data-trans")[0] + f"/energy-data-trans/conf/etl_config_{env}.yaml"
+os.environ['ETL_CONF'] = conf_path
+os.environ['env'] = env
+
+db_last = ''
+if env != 'dev':
+    db_last = db_last + '_' + env
+
+query_sql = f"""
+SELECT
+	t.TABLE_NAME
+FROM
+	information_schema.`TABLES` t
+WHERE
+	t.TABLE_SCHEMA = 'energy_data{db_last}'
+AND t.TABLE_NAME LIKE 'WOF%%_minute'
+AND t.TABLE_NAME NOT IN (
+	SELECT
+		table_name
+	FROM
+		information_schema.`COLUMNS` a
+	WHERE
+		a.TABLE_SCHEMA = 'energy_data{db_last}'
+	AND a.TABLE_NAME LIKE 'WOF%%_minute'
+	AND a.COLUMN_NAME = 'main_bearing_temperature_2'
+)
+"""
+
+
+def get_table_count(table_name):
+    query_begin = time.time()
+    query_sql = f"""
+    select count(1) as count from {table_name}
+    """
+    print(table_name, '统计条数耗时', time.time() - query_begin, trans.execute(query_sql)[0]['count'])
+
+
+def get_update_sql(table_name):
+    update_sql = f"""
+        ALTER TABLE {table_name}
+        ADD COLUMN `main_bearing_temperature_2` double DEFAULT NULL COMMENT '主轴承轴承温度2', 
+        ADD COLUMN `grid_a_phase_current` double DEFAULT NULL COMMENT '电网A相电流',
+        ADD COLUMN `grid_b_phase_current` double DEFAULT NULL COMMENT '电网B相电流',
+        ADD COLUMN `grid_c_phase_current` double DEFAULT NULL COMMENT '电网C相电流',
+        ADD COLUMN `reactive_power` double DEFAULT NULL COMMENT '无功功率';
+        """
+    return update_sql
+
+
+if __name__ == '__main__':
+    from service.common_connect import trans
+
+    # tables = trans.execute(query_sql)
+    # print(tables)
+
+    tables = list()
+    tables.append({'TABLE_NAME': 'WOF093400005_minute'})
+
+    import time
+
+    begin_time = time.time()
+    for table in tables:
+        table_name = '`' + table['TABLE_NAME'] + '`'
+        get_table_count(table_name)
+        update_time = time.time()
+        trans.execute(get_update_sql(table_name))
+        print(table_name, '更新耗时', time.time() - update_time)
+
+    print(len(tables), '张表,总耗时:', time.time() - begin_time)

+ 27 - 0
utils/tmp_util/颗粒度变大.py

@@ -0,0 +1,27 @@
+import os
+
+import pandas as pd
+
+
+def trans_time_granularity(read_dir: str, save_dir: str, time_str: str, time_granularity: str, group_by: list):
+    for root, dirs, files in os.walk(read_dir):
+        for file in files:
+            file_path = os.path.join(root, file)
+            df = pd.read_csv(file_path)
+            # df = df.drop(index=0)
+            df[time_str] = pd.to_datetime(df[time_str], errors='coerce')
+            df[time_str] = df[time_str].dt.ceil(time_granularity)
+            groupby_df = df.groupby(group_by).mean(numeric_only=True).reset_index()
+
+            save_file = file_path.replace(read_dir, save_dir)
+            if not os.path.exists(os.path.dirname(save_file)):
+                os.makedirs(os.path.dirname(save_file))
+
+            groupby_df.to_csv(save_file, index=False, encoding='utf-8')
+
+
+if __name__ == '__main__':
+    read_dir = r'D:\data\tmp_data\龙源\minute'
+    save_dir = r'D:\data\tmp_data\龙源\minute12'
+
+    trans_time_granularity(read_dir, save_dir, 'time_stamp', '20min', ['time_stamp'])