parse_scada_data.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. """
  2. 读取104规约返回的数据,并标准化到临时表中
  3. """
  4. import multiprocessing
  5. import os.path
  6. import time
  7. import traceback
  8. import warnings
  9. from datetime import datetime
  10. import pandas as pd
  11. from service import plt_service, trans_service
  12. from utils.conf.read_conf import yaml_conf, read_conf
  13. from utils.log.trans_log import logger
  14. warnings.filterwarnings('ignore')
  15. def generate_mesurepoint_maps(file_path):
  16. df = pd.read_excel(file_path)
  17. wind_maps = dict()
  18. for _, data in df.iterrows():
  19. shunxuhao = int(data['顺序号']) + 1
  20. changzhan = data['场站标准化编号']
  21. wind_no = data['风机号']
  22. en_name = data['标准化英文']
  23. if changzhan in wind_maps.keys():
  24. if wind_no in wind_maps[changzhan].keys():
  25. wind_maps[changzhan][wind_no][shunxuhao] = en_name
  26. else:
  27. wind_maps[changzhan][wind_no] = {shunxuhao: en_name}
  28. else:
  29. wind_maps[changzhan] = {wind_no: {shunxuhao: en_name}}
  30. return wind_maps
  31. def generate_scada_data(df, mesurepoint_maps, changzhan, wind_code_name_map):
  32. second_dfs = list()
  33. minute_dfs = list()
  34. wind_maps, _ = plt_service.get_wind_info(changzhan, False)
  35. for wind_no in mesurepoint_maps[changzhan].keys():
  36. shunxuhao_map = mesurepoint_maps[changzhan][wind_no]
  37. second_df = df[list(shunxuhao_map.keys())]
  38. second_df['wind_turbine_name'] = wind_no
  39. second_df['time_stamp'] = df[0]
  40. second_df['time_stamp'] = pd.to_datetime(second_df['time_stamp'])
  41. second_df.rename(columns=shunxuhao_map, inplace=True)
  42. second_dfs.append(second_df)
  43. minute_df = second_df.copy(deep=True)
  44. minute_df['time_stamp'] = minute_df['time_stamp'].min().strftime('%Y-%m-%d %H:%M:00')
  45. minute_df = minute_df.groupby(['wind_turbine_name', 'time_stamp']).mean(numeric_only=True).reset_index()
  46. minute_dfs.append(minute_df)
  47. changzhan_second_df = pd.concat(second_dfs, ignore_index=True)
  48. changzhan_minute_df = pd.concat(minute_dfs, ignore_index=True)
  49. changzhan_second_df['wind_turbine_name'] = changzhan_second_df['wind_turbine_name'].astype(str)
  50. changzhan_second_df['wind_turbine_number'] = changzhan_second_df['wind_turbine_name'].map(wind_maps)
  51. changzhan_minute_df['wind_turbine_name'] = changzhan_minute_df['wind_turbine_name'].astype(str)
  52. changzhan_minute_df['wind_turbine_number'] = changzhan_minute_df['wind_turbine_name'].map(wind_maps)
  53. # changzhan_second_df.to_csv(f'tmp/104/scada/{changzhan}-{int(time.time())}_second.csv', index=False)
  54. # changzhan_minute_df.to_csv(f'tmp/104/scada/{changzhan}-{int(time.time())}_minute.csv', index=False)
  55. date_str = datetime.now().strftime('%Y_%m_%d')
  56. second_table_name = f'{changzhan}_second_{date_str}_tmp'
  57. trans_service.save_df_to_db(second_table_name, changzhan_second_df)
  58. minute_table_name = f'{changzhan}_minute_{date_str}_tmp'
  59. trans_service.save_df_to_db(minute_table_name, changzhan_minute_df)
  60. changzhan_minute_df['time_stamp'] = pd.to_datetime(changzhan_minute_df['time_stamp'], errors='coerce')
  61. minute_max_date = changzhan_minute_df['time_stamp'].max()
  62. second_max_date = changzhan_second_df['time_stamp'].max()
  63. add_date_str = minute_max_date.strftime('%Y-%m-%d')
  64. minunte_last_date_str = minute_max_date
  65. second_last_date_str = second_max_date.strftime('%Y-%m-%d %H:%M:%S')
  66. # wind_farm_code, wind_farm_name, add_date, trans_type, count, latest_data_time
  67. trans_service.update_wind_farm_day_count(changzhan, wind_code_name_map.get(changzhan, ''), add_date_str,
  68. 'minute', changzhan_minute_df.shape[0], minunte_last_date_str)
  69. trans_service.update_wind_farm_day_count(changzhan, wind_code_name_map.get(changzhan, ''), add_date_str,
  70. 'second', changzhan_second_df.shape[0], second_last_date_str)
  71. def add_table(changzhan_names):
  72. types = ['minute', 'second']
  73. for changzhan_name in changzhan_names:
  74. for type in types:
  75. table_name = f'{changzhan_name}_{type}_{date_str}_tmp'
  76. if not trans_service.boolean_table_exists(table_name):
  77. trans_service.create_tmp_table(table_name)
  78. if __name__ == '__main__':
  79. time.sleep(60)
  80. total_begin = time.time()
  81. begin = time.time()
  82. date_str = datetime.now().strftime('%Y_%m_%d')
  83. wind_code_name_map = plt_service.get_all_wind_by_company_code('COM00002')
  84. conf_path = os.path.abspath(f"./conf/config.yaml")
  85. yaml_config = yaml_conf(conf_path)
  86. data_base_dir = read_conf(yaml_config, 'data_base_dir')
  87. read_dirs = [os.path.join(data_base_dir, '2404'), os.path.join(data_base_dir, '2405')]
  88. for read_dir in read_dirs:
  89. dir_time = time.time()
  90. for root, dirs, files in os.walk(read_dir):
  91. for file in files:
  92. try:
  93. file_dir = os.path.basename(root)
  94. read_csv = root + os.sep + file
  95. df = pd.read_csv(read_csv, header=None)
  96. mesurepoint_maps = generate_mesurepoint_maps(f'conf/测点表-{file_dir}.xlsx')
  97. add_table(mesurepoint_maps.keys())
  98. with multiprocessing.Pool(5) as pool:
  99. pool.starmap(generate_scada_data,
  100. [(df, mesurepoint_maps, changzhan, wind_code_name_map) for changzhan in
  101. mesurepoint_maps.keys()])
  102. os.remove(read_csv)
  103. except:
  104. logger.error(traceback.format_exc())
  105. logger.info(f'SCADA执行完:{file_dir}/{file}耗时:{time.time() - dir_time}')
  106. logger.info(f'{read_dir}执行总耗时:{time.time() - dir_time}')
  107. logger.info(f'SCADA执行总耗时:{time.time() - total_begin}')