123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import datetime
- import multiprocessing
- import traceback
- from concurrent.futures import ThreadPoolExecutor
- import pandas as pd
- from data.ClassIdentifier import ClassIdentifier
- from data.WindFarmDayCount import WindFarmDayCount
- from service import trans_service
- from service.plt_service import get_wind_info
- from utils.conf.read_conf import read_conf
- from utils.log.trans_log import logger
- class ReadAndSaveDb(object):
- def __init__(self):
- self.yesterday_tables: list[WindFarmDayCount] = self.get_yesterday_tables()
- def get_yesterday_tables(self):
- yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
- date_str = yesterday.strftime('%Y-%m-%d')
- all_datas = trans_service.get_yesterday_tables(date_str)
- if isinstance(all_datas, tuple()):
- return []
- tables = list()
- for data in all_datas:
- tables.append(WindFarmDayCount(data))
- return tables
- def process_single_turbine(self, df, wind_turbine_number, rated_power_and_cutout_speed_map):
- df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
- df.dropna(subset=['time_stamp'], inplace=True)
- df.sort_values(by='time_stamp', inplace=True)
- logger.info(f"有功功率前10个 :{df.head(10)['active_power'].values}")
- power_df = df[df['active_power'] > 0]
- logger.info(f"{wind_turbine_number} 功率大于0的数量:{power_df.shape}")
- power = power_df.sample(int(power_df.shape[0] / 100))['active_power'].median()
- logger.info(f"{wind_turbine_number} 有功功率,中位数:{power}")
- if power > 100000:
- df['active_power'] = df['active_power'] / 1000
- rated_power_and_cutout_speed_tuple = read_conf(rated_power_and_cutout_speed_map, str(wind_turbine_number), None)
- if rated_power_and_cutout_speed_tuple is None:
- rated_power_and_cutout_speed_tuple = (None, None)
- if power_df.shape[0] == 0:
- df.loc[:, 'lab'] = -1
- else:
- class_identifiler = ClassIdentifier(wind_turbine_number=wind_turbine_number, origin_df=df,
- rated_power=rated_power_and_cutout_speed_tuple[0],
- cut_out_speed=rated_power_and_cutout_speed_tuple[1])
- df = class_identifiler.run()
- del power_df
- if not df.empty:
- df['year'] = df['time_stamp'].dt.year
- df['month'] = df['time_stamp'].dt.month
- df['day'] = df['time_stamp'].dt.day
- df['time_stamp'] = df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
- df['year_month'] = df[['year', 'month']].apply(lambda x: str(x['year']) + str(x['month']).zfill(2), axis=1)
- return df
- return df
- def read_and_save_db(self, windFarmDayCount: WindFarmDayCount):
- logger.info(f"开始执行:{str(windFarmDayCount)}")
- try:
- table_name = f"{windFarmDayCount.wind_farm_code}_{windFarmDayCount.type}"
- table_name_tmp = table_name + f"_{str(windFarmDayCount.add_date).replace('-', '_')}_tmp"
- wind_col_trans, rated_power_and_cutout_speed_map = get_wind_info(windFarmDayCount.wind_farm_code)
- df = trans_service.read_data_from_table(table_name_tmp)
- df['wind_turbine_name'] = df['wind_turbine_name'].astype('str')
- df['wind_turbine_number'] = df['wind_turbine_name'].map(wind_col_trans).fillna(df['wind_turbine_name'])
- wind_turbine_numbers = df['wind_turbine_number'].unique()
- dfs = []
- with ThreadPoolExecutor(max_workers=5, thread_name_prefix=windFarmDayCount.wind_farm_name) as executor:
- for wind_turbine_number in wind_turbine_numbers:
- dfs.append(
- executor.submit(self.process_single_turbine,
- df[df['wind_turbine_number'] == wind_turbine_number],
- wind_turbine_number, rated_power_and_cutout_speed_map).result())
- result_df = pd.concat(dfs)
- trans_service.load_data_local(table_name, result_df)
- trans_service.drop_table(table_name_tmp)
- trans_service.update_sync(windFarmDayCount.id)
- logger.info(f"{str(windFarmDayCount)}保存成功")
- except:
- logger.info(f"{str(windFarmDayCount)}出现错误")
- logger.info(traceback.format_exc())
- # raise Exception(f"{str(windFarmDayCount)}出现错误")
- def run(self):
- with multiprocessing.Pool(2) as pool:
- pool.map(self.read_and_save_db, self.yesterday_tables)
|