WaveTrans.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import json
  2. import multiprocessing
  3. import traceback
  4. from service.plt_service import get_all_wind
  5. from service.trans_conf_service import update_trans_status_running, update_trans_transfer_progress, \
  6. update_trans_status_success, update_trans_status_error
  7. from service.trans_service import get_wave_conf, save_df_to_db, get_or_create_wave_table, \
  8. get_wave_data, delete_exist_wave_data
  9. from utils.file.trans_methods import *
  10. from utils.log.trans_log import set_trance_id
  11. from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
  12. exec("from os.path import *")
  13. exec("import re")
  14. class WaveTrans(object):
  15. def __init__(self, id, wind_farm_code, read_dir):
  16. self.id = id
  17. self.wind_farm_code = wind_farm_code
  18. self.read_dir = read_dir
  19. self.begin = datetime.datetime.now()
  20. self.engine_count = 0
  21. self.min_date = None
  22. self.max_date = None
  23. self.data_count = 0
  24. def get_data_exec(self, func_code, filepath, measupoint_names: set):
  25. exec(func_code)
  26. return locals()['get_data'](filepath, measupoint_names)
  27. def del_exists_data(self, df):
  28. min_date, max_date = df['time_stamp'].min(), df['time_stamp'].max()
  29. db_df = get_wave_data(self.wind_farm_code + '_wave', min_date, max_date)
  30. exists_df = pd.merge(db_df, df,
  31. on=['wind_turbine_name', 'time_stamp', 'sampling_frequency', 'mesure_point_name'],
  32. how='inner')
  33. ids = [int(i) for i in exists_df['id'].to_list()]
  34. if ids:
  35. delete_exist_wave_data(self.wind_farm_code + "_wave", ids)
  36. def run(self):
  37. update_trans_status_running(self.id)
  38. trance_id = '-'.join([self.wind_farm_code, 'wave'])
  39. set_trance_id(trance_id)
  40. all_files = read_files(self.read_dir, ['txt', 'csv'])
  41. update_trans_transfer_progress(self.id, 5)
  42. # 最大取系统cpu的 1/2
  43. split_count = get_available_cpu_count_with_percent(1 / 2)
  44. all_wind, _ = get_all_wind(self.wind_farm_code, False)
  45. get_or_create_wave_table(self.wind_farm_code + '_wave')
  46. wave_conf = get_wave_conf(self.wind_farm_code)
  47. base_param_exec = wave_conf['base_param_exec']
  48. map_dict = {}
  49. if base_param_exec:
  50. base_param_exec = base_param_exec.replace('\r\n', '\n').replace('\t', ' ')
  51. trans_print(base_param_exec)
  52. if 'import ' in base_param_exec:
  53. raise Exception("方法不支持import方法")
  54. mesure_poins = [key for key, value in wave_conf.items() if str(key).startswith('conf_') and value]
  55. for point in mesure_poins:
  56. map_dict[wave_conf[point].strip()] = point.replace('conf_', '')
  57. wind_turbine_name_set = set()
  58. all_array = split_array(all_files, split_count * 10)
  59. total_index = len(all_array)
  60. for index, now_array in enumerate(all_array):
  61. index_begin = datetime.datetime.now()
  62. with multiprocessing.Pool(split_count) as pool:
  63. try:
  64. file_datas = pool.starmap(self.get_data_exec,
  65. [(base_param_exec, i, list(map_dict.keys())) for i in now_array])
  66. trans_print(f'总数:{len(now_array)},返回个数{len(file_datas)}')
  67. except Exception as e:
  68. message = str(e)
  69. trans_print(traceback.format_exc())
  70. update_trans_status_error(self.id, message[0:len(message) if len(message) < 100 else 100])
  71. raise e
  72. update_trans_transfer_progress(self.id, 20 + int(index / total_index * 60))
  73. trans_print("读取文件耗时:", datetime.datetime.now() - self.begin)
  74. result_list = list()
  75. for file_data in file_datas:
  76. if file_data:
  77. wind_turbine_name, time_stamp, sampling_frequency, rotational_speed, mesure_point_name, type, mesure_data = \
  78. file_data[0], file_data[1], file_data[2], file_data[3], file_data[4], file_data[5], file_data[6]
  79. if mesure_point_name in map_dict.keys():
  80. wind_turbine_name_set.add(wind_turbine_name)
  81. if self.min_date is None or self.min_date > time_stamp:
  82. self.min_date = time_stamp
  83. if self.max_date is None or self.max_date < time_stamp:
  84. self.max_date = time_stamp
  85. result_list.append(
  86. [wind_turbine_name, time_stamp, rotational_speed, sampling_frequency, mesure_point_name,
  87. type,
  88. mesure_data])
  89. if result_list:
  90. self.data_count = self.data_count + len(result_list)
  91. df = pd.DataFrame(result_list,
  92. columns=['wind_turbine_name', 'time_stamp', 'rotational_speed', 'sampling_frequency',
  93. 'mesure_point_name', 'type', 'mesure_data'])
  94. df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
  95. df['mesure_point_name'] = df['mesure_point_name'].map(map_dict)
  96. df.dropna(subset=['mesure_point_name'], inplace=True)
  97. df['wind_turbine_number'] = df['wind_turbine_name'].map(all_wind).fillna(df['wind_turbine_name'])
  98. df['mesure_data'] = df['mesure_data'].apply(lambda x: json.dumps(x))
  99. df.sort_values(by=['time_stamp', 'mesure_point_name'], inplace=True)
  100. # self.del_exists_data(df)
  101. save_df_to_db(self.wind_farm_code + '_wave', df, batch_count=400)
  102. trans_print(f"总共{total_index}组,当前{index + 1}", "本次写入耗时:", datetime.datetime.now() - index_begin,
  103. "总耗时:", datetime.datetime.now() - self.begin)
  104. update_trans_status_success(self.id, len(wind_turbine_name_set), None,
  105. self.min_date, self.max_date, self.data_count)
  106. # update_trans_status_success(self.id)
  107. trans_print("总耗时:", datetime.datetime.now() - self.begin)