""" 读取104规约返回的数据,并标准化到临时表中 """ import multiprocessing import os.path import time import traceback import warnings from datetime import datetime import pandas as pd from service import plt_service, trans_service from utils.conf.read_conf import yaml_conf, read_conf from utils.log.trans_log import logger warnings.filterwarnings('ignore') def generate_mesurepoint_maps(file_path): df = pd.read_excel(file_path) wind_maps = dict() for _, data in df.iterrows(): shunxuhao = int(data['顺序号']) + 1 changzhan = data['场站标准化编号'] wind_no = data['风机号'] en_name = data['标准化英文'] if changzhan in wind_maps.keys(): if wind_no in wind_maps[changzhan].keys(): wind_maps[changzhan][wind_no][shunxuhao] = en_name else: wind_maps[changzhan][wind_no] = {shunxuhao: en_name} else: wind_maps[changzhan] = {wind_no: {shunxuhao: en_name}} return wind_maps def generate_scada_data(df, mesurepoint_maps, changzhan, wind_code_name_map): second_dfs = list() minute_dfs = list() wind_maps, _ = plt_service.get_wind_info(changzhan, False) for wind_no in mesurepoint_maps[changzhan].keys(): shunxuhao_map = mesurepoint_maps[changzhan][wind_no] second_df = df[list(shunxuhao_map.keys())] second_df['wind_turbine_name'] = wind_no second_df['time_stamp'] = df[0] second_df['time_stamp'] = pd.to_datetime(second_df['time_stamp']) second_df.rename(columns=shunxuhao_map, inplace=True) second_dfs.append(second_df) minute_df = second_df.copy(deep=True) minute_df['time_stamp'] = minute_df['time_stamp'].min().strftime('%Y-%m-%d %H:%M:00') minute_df = minute_df.groupby(['wind_turbine_name', 'time_stamp']).mean(numeric_only=True).reset_index() minute_dfs.append(minute_df) changzhan_second_df = pd.concat(second_dfs, ignore_index=True) changzhan_minute_df = pd.concat(minute_dfs, ignore_index=True) changzhan_second_df['wind_turbine_name'] = changzhan_second_df['wind_turbine_name'].astype(str) changzhan_second_df['wind_turbine_number'] = changzhan_second_df['wind_turbine_name'].map(wind_maps) changzhan_minute_df['wind_turbine_name'] = changzhan_minute_df['wind_turbine_name'].astype(str) changzhan_minute_df['wind_turbine_number'] = changzhan_minute_df['wind_turbine_name'].map(wind_maps) # changzhan_second_df.to_csv(f'tmp/104/scada/{changzhan}-{int(time.time())}_second.csv', index=False) # changzhan_minute_df.to_csv(f'tmp/104/scada/{changzhan}-{int(time.time())}_minute.csv', index=False) date_str = datetime.now().strftime('%Y_%m_%d') second_table_name = f'{changzhan}_second_{date_str}_tmp' trans_service.save_df_to_db(second_table_name, changzhan_second_df) minute_table_name = f'{changzhan}_minute_{date_str}_tmp' trans_service.save_df_to_db(minute_table_name, changzhan_minute_df) changzhan_minute_df['time_stamp'] = pd.to_datetime(changzhan_minute_df['time_stamp'], errors='coerce') minute_max_date = changzhan_minute_df['time_stamp'].max() second_max_date = changzhan_second_df['time_stamp'].max() add_date_str = minute_max_date.strftime('%Y-%m-%d') minunte_last_date_str = minute_max_date second_last_date_str = second_max_date.strftime('%Y-%m-%d %H:%M:%S') # wind_farm_code, wind_farm_name, add_date, trans_type, count, latest_data_time trans_service.update_wind_farm_day_count(changzhan, wind_code_name_map.get(changzhan, ''), add_date_str, 'minute', changzhan_minute_df.shape[0], minunte_last_date_str) trans_service.update_wind_farm_day_count(changzhan, wind_code_name_map.get(changzhan, ''), add_date_str, 'second', changzhan_second_df.shape[0], second_last_date_str) def add_table(changzhan_names): types = ['minute', 'second'] for changzhan_name in changzhan_names: for type in types: table_name = f'{changzhan_name}_{type}_{date_str}_tmp' if not trans_service.boolean_table_exists(table_name): trans_service.create_tmp_table(table_name) if __name__ == '__main__': time.sleep(60) total_begin = time.time() begin = time.time() date_str = datetime.now().strftime('%Y_%m_%d') wind_code_name_map = plt_service.get_all_wind_by_company_code('COM00002') conf_path = os.path.abspath(f"./conf/config.yaml") yaml_config = yaml_conf(conf_path) data_base_dir = read_conf(yaml_config, 'data_base_dir') read_dirs = [os.path.join(data_base_dir, '2404'), os.path.join(data_base_dir, '2405')] for read_dir in read_dirs: dir_time = time.time() for root, dirs, files in os.walk(read_dir): for file in files: try: file_dir = os.path.basename(root) read_csv = root + os.sep + file df = pd.read_csv(read_csv, header=None) mesurepoint_maps = generate_mesurepoint_maps(f'conf/测点表-{file_dir}.xlsx') add_table(mesurepoint_maps.keys()) with multiprocessing.Pool(5) as pool: pool.starmap(generate_scada_data, [(df, mesurepoint_maps, changzhan, wind_code_name_map) for changzhan in mesurepoint_maps.keys()]) os.remove(read_csv) except: logger.error(traceback.format_exc()) logger.info(f'SCADA执行完:{file_dir}/{file}耗时:{time.time() - dir_time}') logger.info(f'{read_dir}执行总耗时:{time.time() - dir_time}') logger.info(f'SCADA执行总耗时:{time.time() - total_begin}')