Selaa lähdekoodia

添加数据字段存在多个相同的情况

wzl 1 kuukausi sitten
vanhempi
commit
1e0a576c2b

+ 3 - 1
.gitignore

@@ -9,4 +9,6 @@ build
 dist
 etl_tool.spec
 tmp_file
-/test
+/test
+.venv
+/utils/tmp_util/

+ 22 - 16
app_run.py

@@ -26,42 +26,48 @@ def run(save_db=True, run_count=1, yaml_config=None, step=0, end=999):
         return
 
     exec_process = None
-    if data['transfer_type'] in ['second', 'minute']:
-        exec_process = MinSecTrans(data=data, save_db=save_db, yaml_config=yaml_config, step=step, end=end)
+    if data["transfer_type"] in ["second", "minute"]:
+        exec_process = MinSecTrans(
+            data=data, save_db=save_db, yaml_config=yaml_config, step=step, end=end
+        )
 
-    if data['transfer_type'] in ['fault', 'warn']:
-        exec_process = FaultWarnTrans(data=data, save_db=save_db, yaml_config=yaml_config)
+    if data["transfer_type"] in ["fault", "warn"]:
+        exec_process = FaultWarnTrans(
+            data=data, save_db=save_db, yaml_config=yaml_config
+        )
 
-    if data['transfer_type'] == 'wave':
-        exec_process = WaveTrans(data['id'], data['wind_farm_code'], data['read_dir'])
+    if data["transfer_type"] == "wave":
+        exec_process = WaveTrans(data["id"], data["wind_farm_code"], data["read_dir"])
 
-    if data['transfer_type'] == 'laser':
-        exec_process = LaserTrans(data['id'], data['wind_farm_code'], data['read_dir'])
+    if data["transfer_type"] == "laser":
+        exec_process = LaserTrans(data["id"], data["wind_farm_code"], data["read_dir"])
 
     if exec_process is None:
         raise Exception("没有相应的执行器")
     exec_process.run()
 
 
-if __name__ == '__main__':
-
-    env = 'dev'
+if __name__ == "__main__":
+    env = "dev"
     if len(sys.argv) >= 2:
         env = sys.argv[1]
 
-    if env.endswith('.yaml'):
+    if env.endswith(".yaml"):
         conf_path = env
     else:
         conf_path = path.abspath(f"./conf/etl_config_{env}.yaml")
 
-    environ['ETL_CONF'] = conf_path
+    environ["ETL_CONF"] = conf_path
     yaml_config = yaml_conf(conf_path)
-    environ['env'] = env
+    environ["env"] = env
     run_count = int(read_conf(yaml_config, "run_batch_count", 1))
 
     from utils.log.trans_log import trans_print
-    from service.trans_conf_service import update_timeout_trans_data, \
-        get_now_running_count, get_batch_exec_data
+    from service.trans_conf_service import (
+        update_timeout_trans_data,
+        get_now_running_count,
+        get_batch_exec_data,
+    )
     from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
     from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
     from etl.wind_power.laser.LaserTrans import LaserTrans

+ 10 - 3
conf/etl_config_dev.yaml

@@ -5,11 +5,18 @@ plt:
   port: 3306
   user: admin
 
+# trans:
+#   database: energy_data
+#   host: 192.168.50.235
+#   password: admin123456
+#   port: 30306
+#   user: root
+
 trans:
   database: energy_data
-  host: 192.168.50.235
+  host: 106.120.102.238
   password: admin123456
-  port: 30306
+  port: 10336
   user: root
 
 # 如果要放在原始路径,则配置这个 以下面的名称作为切割点,新建清理数据文件夹
@@ -18,7 +25,7 @@ etl_origin_path_contain: 收资数据
 save_path: /data/download/collection_data/tmp/dev
 
 # 日志保存路径
-log_path_dir: /data/logs/no_batch_trans
+log_path_dir: /home/wzl/logs/no_batch_trans
 
 # 临时文件存放处,有些甲方公司隔得tmp太小,只好自己配置
 tmp_base_path: /tmp

+ 2 - 2
etl/common/PathsAndTable.py

@@ -76,9 +76,9 @@ class PathsAndTable(object):
         if self.save_db:
             trans_print("开始创建表")
             if self.read_type in ['second', 'minute']:
-                creat_min_sec_table(self.get_table_name(), self.read_type, self.use_tidb)
+                creat_min_sec_table(self.get_table_name(), self.read_type, self.wind_farm_name, self.use_tidb)
             elif self.read_type in ['fault', 'warn']:
-                create_warn_fault_table(self.get_table_name())
+                create_warn_fault_table(self.get_table_name(), self.wind_farm_name, )
             else:
                 raise Exception("不支持的读取类型:" + self.read_type)
             trans_print("建表结束")

+ 1 - 1
etl/wind_power/fault_warn/FaultWarnTrans.py

@@ -170,7 +170,7 @@ class FaultWarnTrans(BaseDataTrans):
     def save_to_db(self):
         table_name = self.pathsAndTable.get_table_name()
         drop_table(table_name)
-        create_warn_fault_table(table_name)
+        create_warn_fault_table(table_name, self.wind_farm_name)
         save_file_to_db(table_name, self.update_files[0], self.batch_count)
 
     def update_exec_progress(self):

+ 11 - 0
etl/wind_power/min_sec/ReadAndSaveTmp.py

@@ -94,6 +94,17 @@ class ReadAndSaveTmp(object):
                                 same_col[value].append(k)
                             else:
                                 same_col[value] = [k]
+                    if v and v.find('|') > -1:
+                        vs = v.split('|')
+                        for s in vs:
+                            if s not in real_cols_trans.keys():
+                                real_cols_trans[s] = k
+                            else:
+                                value = real_cols_trans[s]
+                                if value in same_col.keys():
+                                    same_col[value].append(k)
+                                else:
+                                    same_col[value] = [k]
 
                 df.rename(columns=real_cols_trans, inplace=True)
 

+ 58 - 50
etl/wind_power/min_sec/StatisticsAndSaveTmpFormalFile.py

@@ -79,7 +79,7 @@ class StatisticsAndSaveTmpFormalFile(object):
 
         # 删除 有功功率 和 风速均为空的情况
         df.dropna(subset=['active_power', 'wind_velocity'], how='any', inplace=True)
-        trans_print(origin_wind_name, wind_col_name, "删除有功功率和风速均为空的情况后:", df.shape)
+        trans_print(origin_wind_name, wind_col_name, "删除有功功率和风速空的情况后:", df.shape)
         df.replace(np.nan, -999999999, inplace=True)
         number_cols = df.select_dtypes(include=['number']).columns.tolist()
         for col in df.columns:
@@ -93,15 +93,16 @@ class StatisticsAndSaveTmpFormalFile(object):
 
         df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
 
+        df['time_stamp'] = df['time_stamp'].str.strip()
         df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
         df.dropna(subset=['time_stamp'], inplace=True)
         df.sort_values(by='time_stamp', inplace=True)
         df = df[[i for i in self.trans_param.cols_tran.keys() if i in df.columns]]
 
         # 删除每行有空值的行(2025-3-24)
-        origin_count = df.shape[0]
-        df = df.dropna()
-        trans_print(f'原始数据量:{origin_count},去除na后数据量:{df.shape[0]}')
+        # origin_count = df.shape[0]
+        # df = df.dropna()
+        # trans_print(f'原始数据量:{origin_count},去除na后数据量:{df.shape[0]}')
 
         # 如果秒级有可能合并到分钟级
         # TODO add 秒转分钟
@@ -123,61 +124,68 @@ class StatisticsAndSaveTmpFormalFile(object):
         rated_power_and_cutout_speed_tuple = read_conf(self.rated_power_and_cutout_speed_map, str(wind_col_name))
         if rated_power_and_cutout_speed_tuple is None:
             rated_power_and_cutout_speed_tuple = (None, None)
-
-        trans_print('过滤数据前数据大小', df.shape)
-        filter_valid_data = FilterValidData(df, rated_power_and_cutout_speed_tuple[0])
-        df = filter_valid_data.run()
-        trans_print('过滤数据后数据大小', df.shape)
-
-        # 如果有需要处理的,先进行代码处理,在进行打标签
-        # exec_code = get_trans_exec_code(self.paths_and_table.exec_id, self.paths_and_table.read_type)
-        # if exec_code:
-        #     if 'import ' in exec_code:
-        #         raise Exception("执行代码不支持导入包")
-        #     exec(exec_code)
-
-        if power_df.shape[0] == 0:
-            df.loc[:, 'lab'] = -1
+            trans_print(origin_wind_name, '未从平台匹配到额定功率')
         else:
-            class_identifiler = ClassIdentifier(wind_turbine_number=origin_wind_name, origin_df=df,
-                                                rated_power=rated_power_and_cutout_speed_tuple[0],
-                                                cut_out_speed=rated_power_and_cutout_speed_tuple[1])
-            df = class_identifiler.run()
+            trans_print(origin_wind_name, '过滤数据前数据大小', df.shape)
+            trans_print(origin_wind_name, '额定功率', rated_power_and_cutout_speed_tuple[0])
+            # trans_print(origin_wind_name, '\n', df.head(10))
+            filter_valid_data = FilterValidData(df, rated_power_and_cutout_speed_tuple[0])
+            try:
+                df = filter_valid_data.run()
+            except:
+                trans_print(origin_wind_name, '过滤数据异常', filename)
+                raise
+            trans_print(origin_wind_name, '过滤数据后数据大小', df.shape)
+
+            # 如果有需要处理的,先进行代码处理,在进行打标签
+            # exec_code = get_trans_exec_code(self.paths_and_table.exec_id, self.paths_and_table.read_type)
+            # if exec_code:
+            #     if 'import ' in exec_code:
+            #         raise Exception("执行代码不支持导入包")
+            #     exec(exec_code)
+
+            if power_df.shape[0] == 0:
+                df.loc[:, 'lab'] = -1
+            else:
+                class_identifiler = ClassIdentifier(wind_turbine_number=origin_wind_name, origin_df=df,
+                                                    rated_power=rated_power_and_cutout_speed_tuple[0],
+                                                    cut_out_speed=rated_power_and_cutout_speed_tuple[1])
+                df = class_identifiler.run()
 
-        del power_df
+            del power_df
 
-        df['year'] = df['time_stamp'].dt.year
-        df['month'] = df['time_stamp'].dt.month
-        df['day'] = df['time_stamp'].dt.day
-        df['time_stamp'] = df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
+            df['year'] = df['time_stamp'].dt.year
+            df['month'] = df['time_stamp'].dt.month
+            df['day'] = df['time_stamp'].dt.day
+            df['time_stamp'] = df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
 
-        df['wind_turbine_name'] = str(origin_wind_name)
-        df['year_month'] = df[['year', 'month']].apply(lambda x: str(x['year']) + str(x['month']).zfill(2), axis=1)
-        cols = df.columns
+            df['wind_turbine_name'] = str(origin_wind_name)
+            df['year_month'] = df[['year', 'month']].apply(lambda x: str(x['year']) + str(x['month']).zfill(2), axis=1)
+            cols = df.columns
 
-        if self.paths_and_table.read_type == 'second':
-            type_col = 'year_month'
-        else:
-            type_col = 'year'
-
-        date_strs = df[type_col].unique().tolist()
-        for date_str in date_strs:
-            save_path = path.join(self.paths_and_table.get_tmp_formal_path(), str(date_str),
-                                  str(origin_wind_name) + '.csv')
-            create_file_path(save_path, is_file_path=True)
-            now_df = df[df[type_col] == date_str][cols]
-            if self.paths_and_table.save_zip:
-                save_path = save_path + '.gz'
-                now_df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
+            if self.paths_and_table.read_type == 'second':
+                type_col = 'year_month'
             else:
-                now_df.to_csv(save_path, index=False, encoding='utf-8')
+                type_col = 'year'
+
+            date_strs = df[type_col].unique().tolist()
+            for date_str in date_strs:
+                save_path = path.join(self.paths_and_table.get_tmp_formal_path(), str(date_str),
+                                      str(origin_wind_name) + '.csv')
+                create_file_path(save_path, is_file_path=True)
+                now_df = df[df[type_col] == date_str][cols]
+                if self.paths_and_table.save_zip:
+                    save_path = save_path + '.gz'
+                    now_df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
+                else:
+                    now_df.to_csv(save_path, index=False, encoding='utf-8')
 
-            del now_df
+                del now_df
 
-        self.set_statistics_data(df)
+            self.set_statistics_data(df)
 
-        del df
-        trans_print("保存" + str(wind_col_name) + "成功")
+            del df
+            trans_print("保存" + str(wind_col_name) + "成功")
 
     def mutiprocessing_to_save_file(self):
         # 开始保存到正式文件

+ 4 - 4
service/trans_service.py

@@ -58,7 +58,7 @@ def get_wave_conf(field_code) -> dict:
     return res[0]
 
 
-def creat_min_sec_table(table_name, trans_type, use_tidb=False):
+def creat_min_sec_table(table_name, trans_type, wind_farm_name='', use_tidb=False):
     exists_table_sql = f"""
     select count(1) as count from information_schema.tables where table_schema = '{trans.database}' and table_name = '{table_name}'
     """
@@ -140,7 +140,7 @@ def creat_min_sec_table(table_name, trans_type, use_tidb=False):
              KEY `time_stamp` (`time_stamp`),
              KEY `wind_turbine_number` (`wind_turbine_number`),
              {add_key}
-        ) 
+        ) COMMENT='{wind_farm_name}'
         """
         # if not use_tidb:
         create_sql = create_sql + f"""
@@ -255,7 +255,7 @@ def batch_statistics(table_name):
         return None
 
 
-def create_warn_fault_table(table_name):
+def create_warn_fault_table(table_name, wind_farm_name=''):
     sql = f"""
     CREATE TABLE `{table_name}` (
       `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号',
@@ -272,7 +272,7 @@ def create_warn_fault_table(table_name):
       KEY `wind_turbine_number` (`wind_turbine_number`),
       KEY `begin_time` (`begin_time`),
       KEY `end_time` (`end_time`)
-    ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4
+    ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4 COMMENT='{wind_farm_name}'
     """
 
     trans.execute(sql)