ReadAndSaveDb.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import datetime
  2. import multiprocessing
  3. import traceback
  4. from concurrent.futures import ThreadPoolExecutor
  5. import pandas as pd
  6. from data.ClassIdentifier import ClassIdentifier
  7. from data.WindFarmDayCount import WindFarmDayCount
  8. from service import trans_service
  9. from service.plt_service import get_wind_info
  10. from utils.conf.read_conf import read_conf
  11. from utils.log.trans_log import logger
  12. class ReadAndSaveDb(object):
  13. def __init__(self):
  14. self.yesterday_tables: list[WindFarmDayCount] = self.get_yesterday_tables()
  15. def get_yesterday_tables(self):
  16. yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
  17. date_str = yesterday.strftime('%Y-%m-%d')
  18. all_datas = trans_service.get_yesterday_tables(date_str)
  19. if isinstance(all_datas, tuple()):
  20. return []
  21. tables = list()
  22. for data in all_datas:
  23. tables.append(WindFarmDayCount(data))
  24. return tables
  25. def process_single_turbine(self, df, wind_turbine_number, rated_power_and_cutout_speed_map):
  26. df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
  27. df.dropna(subset=['time_stamp'], inplace=True)
  28. df.sort_values(by='time_stamp', inplace=True)
  29. logger.info(f"有功功率前10个 :{df.head(10)['active_power'].values}")
  30. power_df = df[df['active_power'] > 0]
  31. logger.info(f"{wind_turbine_number} 功率大于0的数量:{power_df.shape}")
  32. power = power_df.sample(int(power_df.shape[0] / 100))['active_power'].median()
  33. logger.info(f"{wind_turbine_number} 有功功率,中位数:{power}")
  34. if power > 100000:
  35. df['active_power'] = df['active_power'] / 1000
  36. rated_power_and_cutout_speed_tuple = read_conf(rated_power_and_cutout_speed_map, str(wind_turbine_number), None)
  37. if rated_power_and_cutout_speed_tuple is None:
  38. rated_power_and_cutout_speed_tuple = (None, None)
  39. if power_df.shape[0] == 0:
  40. df.loc[:, 'lab'] = -1
  41. else:
  42. class_identifiler = ClassIdentifier(wind_turbine_number=wind_turbine_number, origin_df=df,
  43. rated_power=rated_power_and_cutout_speed_tuple[0],
  44. cut_out_speed=rated_power_and_cutout_speed_tuple[1])
  45. df = class_identifiler.run()
  46. del power_df
  47. if not df.empty:
  48. df['year'] = df['time_stamp'].dt.year
  49. df['month'] = df['time_stamp'].dt.month
  50. df['day'] = df['time_stamp'].dt.day
  51. df['time_stamp'] = df['time_stamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  52. df['year_month'] = df[['year', 'month']].apply(lambda x: str(x['year']) + str(x['month']).zfill(2), axis=1)
  53. return df
  54. return df
  55. def read_and_save_db(self, windFarmDayCount: WindFarmDayCount):
  56. logger.info(f"开始执行:{str(windFarmDayCount)}")
  57. try:
  58. table_name = f"{windFarmDayCount.wind_farm_code}_{windFarmDayCount.type}"
  59. table_name_tmp = table_name + f"_{str(windFarmDayCount.add_date).replace('-', '_')}_tmp"
  60. wind_col_trans, rated_power_and_cutout_speed_map = get_wind_info(windFarmDayCount.wind_farm_code)
  61. df = trans_service.read_data_from_table(table_name_tmp)
  62. df['wind_turbine_name'] = df['wind_turbine_name'].astype('str')
  63. df['wind_turbine_number'] = df['wind_turbine_name'].map(wind_col_trans).fillna(df['wind_turbine_name'])
  64. wind_turbine_numbers = df['wind_turbine_number'].unique()
  65. dfs = []
  66. with ThreadPoolExecutor(max_workers=5, thread_name_prefix=windFarmDayCount.wind_farm_name) as executor:
  67. for wind_turbine_number in wind_turbine_numbers:
  68. dfs.append(
  69. executor.submit(self.process_single_turbine,
  70. df[df['wind_turbine_number'] == wind_turbine_number],
  71. wind_turbine_number, rated_power_and_cutout_speed_map).result())
  72. result_df = pd.concat(dfs)
  73. trans_service.load_data_local(table_name, result_df)
  74. trans_service.drop_table(table_name_tmp)
  75. trans_service.update_sync(windFarmDayCount.id)
  76. logger.info(f"{str(windFarmDayCount)}保存成功")
  77. except:
  78. logger.info(f"{str(windFarmDayCount)}出现错误")
  79. logger.info(traceback.format_exc())
  80. # raise Exception(f"{str(windFarmDayCount)}出现错误")
  81. def run(self):
  82. with multiprocessing.Pool(2) as pool:
  83. pool.map(self.read_and_save_db, self.yesterday_tables)