import datetime import multiprocessing import traceback from concurrent.futures import ThreadPoolExecutor import pandas as pd from data.ClassIdentifier import ClassIdentifier from data.WindFarmDayCount import WindFarmDayCount from service import trans_service from service.plt_service import get_wind_info from utils.conf.read_conf import read_conf from utils.log.trans_log import logger class ReadAndSaveDb(object): def __init__(self): self.yesterday_tables: list[WindFarmDayCount] = self.get_yesterday_tables() def get_yesterday_tables(self): yesterday = datetime.datetime.now() - datetime.timedelta(days=1) date_str = yesterday.strftime('%Y-%m-%d') all_datas = trans_service.get_yesterday_tables(date_str) if isinstance(all_datas, tuple()): return [] tables = list() for data in all_datas: tables.append(WindFarmDayCount(data)) return tables def process_single_turbine(self, df, wind_turbine_number, rated_power_and_cutout_speed_map): df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce") df.dropna(subset=['time_stamp'], inplace=True) df.sort_values(by='time_stamp', inplace=True) logger.info(f"有功功率前10个 :{df.head(10)['active_power'].values}") power_df = df[df['active_power'] > 0] logger.info(f"{wind_turbine_number} 功率大于0的数量:{power_df.shape}") power = power_df.sample(int(power_df.shape[0] / 100))['active_power'].median() logger.info(f"{wind_turbine_number} 有功功率,中位数:{power}") if power > 100000: df['active_power'] = df['active_power'] / 1000 rated_power_and_cutout_speed_tuple = read_conf(rated_power_and_cutout_speed_map, str(wind_turbine_number), None) if rated_power_and_cutout_speed_tuple is None: rated_power_and_cutout_speed_tuple = (None, None) if power_df.shape[0] == 0: df.loc[:, 'lab'] = -1 else: class_identifiler = ClassIdentifier(wind_turbine_number=wind_turbine_number, origin_df=df, rated_power=rated_power_and_cutout_speed_tuple[0], cut_out_speed=rated_power_and_cutout_speed_tuple[1]) df = class_identifiler.run() del power_df if not df.empty: df['year'] = df['time_stamp'].dt.year df['month'] = df['time_stamp'].dt.month df['day'] = df['time_stamp'].dt.day df['time_stamp'] = df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S')) df['year_month'] = df[['year', 'month']].apply(lambda x: str(x['year']) + str(x['month']).zfill(2), axis=1) return df return df def read_and_save_db(self, windFarmDayCount: WindFarmDayCount): logger.info(f"开始执行:{str(windFarmDayCount)}") try: table_name = f"{windFarmDayCount.wind_farm_code}_{windFarmDayCount.type}" table_name_tmp = table_name + f"_{str(windFarmDayCount.add_date).replace('-', '_')}_tmp" wind_col_trans, rated_power_and_cutout_speed_map = get_wind_info(windFarmDayCount.wind_farm_code) df = trans_service.read_data_from_table(table_name_tmp) df['wind_turbine_name'] = df['wind_turbine_name'].astype('str') df['wind_turbine_number'] = df['wind_turbine_name'].map(wind_col_trans).fillna(df['wind_turbine_name']) wind_turbine_numbers = df['wind_turbine_number'].unique() dfs = [] with ThreadPoolExecutor(max_workers=5, thread_name_prefix=windFarmDayCount.wind_farm_name) as executor: for wind_turbine_number in wind_turbine_numbers: dfs.append( executor.submit(self.process_single_turbine, df[df['wind_turbine_number'] == wind_turbine_number], wind_turbine_number, rated_power_and_cutout_speed_map).result()) result_df = pd.concat(dfs) trans_service.load_data_local(table_name, result_df) trans_service.drop_table(table_name_tmp) trans_service.update_sync(windFarmDayCount.id) logger.info(f"{str(windFarmDayCount)}保存成功") except: logger.info(f"{str(windFarmDayCount)}出现错误") logger.info(traceback.format_exc()) # raise Exception(f"{str(windFarmDayCount)}出现错误") def run(self): with multiprocessing.Pool(2) as pool: pool.map(self.read_and_save_db, self.yesterday_tables)