| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- """
- 读取104规约返回的数据,并标准化到临时表中
- """
- import multiprocessing
- import os.path
- import time
- import traceback
- import warnings
- from datetime import datetime
- import pandas as pd
- from service import plt_service, trans_service
- from utils.conf.read_conf import yaml_conf, read_conf
- from utils.log.trans_log import logger
- warnings.filterwarnings('ignore')
- def generate_mesurepoint_maps(file_path):
- df = pd.read_excel(file_path)
- wind_maps = dict()
- for _, data in df.iterrows():
- shunxuhao = int(data['顺序号']) + 1
- changzhan = data['场站标准化编号']
- wind_no = data['风机号']
- en_name = data['标准化英文']
- if changzhan in wind_maps.keys():
- if wind_no in wind_maps[changzhan].keys():
- wind_maps[changzhan][wind_no][shunxuhao] = en_name
- else:
- wind_maps[changzhan][wind_no] = {shunxuhao: en_name}
- else:
- wind_maps[changzhan] = {wind_no: {shunxuhao: en_name}}
- return wind_maps
- def generate_scada_data(df, mesurepoint_maps, changzhan, wind_code_name_map):
- second_dfs = list()
- minute_dfs = list()
- wind_maps, _ = plt_service.get_wind_info(changzhan, False)
- for wind_no in mesurepoint_maps[changzhan].keys():
- shunxuhao_map = mesurepoint_maps[changzhan][wind_no]
- second_df = df[list(shunxuhao_map.keys())]
- second_df['wind_turbine_name'] = wind_no
- second_df['time_stamp'] = df[0]
- second_df['time_stamp'] = pd.to_datetime(second_df['time_stamp'])
- second_df.rename(columns=shunxuhao_map, inplace=True)
- second_dfs.append(second_df)
- minute_df = second_df.copy(deep=True)
- minute_df['time_stamp'] = minute_df['time_stamp'].min().strftime('%Y-%m-%d %H:%M:00')
- minute_df = minute_df.groupby(['wind_turbine_name', 'time_stamp']).mean(numeric_only=True).reset_index()
- minute_dfs.append(minute_df)
- changzhan_second_df = pd.concat(second_dfs, ignore_index=True)
- changzhan_minute_df = pd.concat(minute_dfs, ignore_index=True)
- changzhan_second_df['wind_turbine_name'] = changzhan_second_df['wind_turbine_name'].astype(str)
- changzhan_second_df['wind_turbine_number'] = changzhan_second_df['wind_turbine_name'].map(wind_maps)
- changzhan_minute_df['wind_turbine_name'] = changzhan_minute_df['wind_turbine_name'].astype(str)
- changzhan_minute_df['wind_turbine_number'] = changzhan_minute_df['wind_turbine_name'].map(wind_maps)
- # changzhan_second_df.to_csv(f'tmp/104/scada/{changzhan}-{int(time.time())}_second.csv', index=False)
- # changzhan_minute_df.to_csv(f'tmp/104/scada/{changzhan}-{int(time.time())}_minute.csv', index=False)
- date_str = datetime.now().strftime('%Y_%m_%d')
- second_table_name = f'{changzhan}_second_{date_str}_tmp'
- trans_service.save_df_to_db(second_table_name, changzhan_second_df)
- minute_table_name = f'{changzhan}_minute_{date_str}_tmp'
- trans_service.save_df_to_db(minute_table_name, changzhan_minute_df)
- changzhan_minute_df['time_stamp'] = pd.to_datetime(changzhan_minute_df['time_stamp'], errors='coerce')
- minute_max_date = changzhan_minute_df['time_stamp'].max()
- second_max_date = changzhan_second_df['time_stamp'].max()
- add_date_str = minute_max_date.strftime('%Y-%m-%d')
- minunte_last_date_str = minute_max_date
- second_last_date_str = second_max_date.strftime('%Y-%m-%d %H:%M:%S')
- # wind_farm_code, wind_farm_name, add_date, trans_type, count, latest_data_time
- trans_service.update_wind_farm_day_count(changzhan, wind_code_name_map.get(changzhan, ''), add_date_str,
- 'minute', changzhan_minute_df.shape[0], minunte_last_date_str)
- trans_service.update_wind_farm_day_count(changzhan, wind_code_name_map.get(changzhan, ''), add_date_str,
- 'second', changzhan_second_df.shape[0], second_last_date_str)
- def add_table(changzhan_names):
- types = ['minute', 'second']
- for changzhan_name in changzhan_names:
- for type in types:
- table_name = f'{changzhan_name}_{type}_{date_str}_tmp'
- if not trans_service.boolean_table_exists(table_name):
- trans_service.create_tmp_table(table_name)
- if __name__ == '__main__':
- time.sleep(60)
- total_begin = time.time()
- begin = time.time()
- date_str = datetime.now().strftime('%Y_%m_%d')
- wind_code_name_map = plt_service.get_all_wind_by_company_code('COM00002')
- conf_path = os.path.abspath(f"./conf/config.yaml")
- yaml_config = yaml_conf(conf_path)
- data_base_dir = read_conf(yaml_config, 'data_base_dir')
- read_dirs = [os.path.join(data_base_dir, '2404'), os.path.join(data_base_dir, '2405')]
- for read_dir in read_dirs:
- dir_time = time.time()
- for root, dirs, files in os.walk(read_dir):
- for file in files:
- try:
- file_dir = os.path.basename(root)
- read_csv = root + os.sep + file
- df = pd.read_csv(read_csv, header=None)
- mesurepoint_maps = generate_mesurepoint_maps(f'conf/测点表-{file_dir}.xlsx')
- add_table(mesurepoint_maps.keys())
- with multiprocessing.Pool(5) as pool:
- pool.starmap(generate_scada_data,
- [(df, mesurepoint_maps, changzhan, wind_code_name_map) for changzhan in
- mesurepoint_maps.keys()])
- os.remove(read_csv)
- except:
- logger.error(traceback.format_exc())
- logger.info(f'SCADA执行完:{file_dir}/{file}耗时:{time.time() - dir_time}')
- logger.info(f'{read_dir}执行总耗时:{time.time() - dir_time}')
- logger.info(f'SCADA执行总耗时:{time.time() - total_begin}')
|