WaveTrans.py 3.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import datetime
  2. import json
  3. import multiprocessing
  4. from os.path import basename, dirname
  5. import pandas as pd
  6. from service.plt_service import get_all_wind
  7. from service.trans_service import get_wave_conf, save_df_to_db, get_or_create_wave_table, \
  8. get_wave_data, delete_exist_wave_data
  9. from utils.file.trans_methods import *
  10. from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
  11. class WaveTrans(object):
  12. def __init__(self, field_code, read_path, save_path: str):
  13. self.field_code = field_code
  14. self.read_path = read_path
  15. self.save_path = save_path
  16. self.begin = datetime.datetime.now()
  17. def get_data_exec(self, func_code, arg):
  18. exec(func_code)
  19. return locals()['get_data'](arg)
  20. def del_exists_data(self, df):
  21. min_date, max_date = df['time_stamp'].min(), df['time_stamp'].max()
  22. db_df = get_wave_data(self.field_code + '_wave', min_date, max_date)
  23. exists_df = pd.merge(db_df, df,
  24. on=['wind_turbine_name', 'time_stamp', 'sampling_frequency', 'mesure_point_name'],
  25. how='inner')
  26. ids = [int(i) for i in exists_df['id'].to_list()]
  27. if ids:
  28. delete_exist_wave_data(self.field_code + "_wave", ids)
  29. def run(self):
  30. all_files = read_files(self.read_path, ['csv'])
  31. print(len)
  32. # 最大取系统cpu的 1/2
  33. split_count = get_available_cpu_count_with_percent(1 / 2)
  34. all_wind, _ = get_all_wind(self.field_code, False)
  35. get_or_create_wave_table(self.field_code + '_wave')
  36. wave_conf = get_wave_conf(self.field_code)
  37. base_param_exec = wave_conf['base_param_exec']
  38. map_dict = {}
  39. if base_param_exec:
  40. base_param_exec = base_param_exec.replace('\r\n', '\n').replace('\t', ' ')
  41. print(base_param_exec)
  42. if 'import ' in base_param_exec:
  43. raise Exception("方法不支持import方法")
  44. mesure_poins = [key for key, value in wave_conf.items() if str(key).startswith('conf_') and value]
  45. for point in mesure_poins:
  46. map_dict[wave_conf[point]] = point.replace('conf_', '')
  47. map_dict['rotational_speed'] = 'rotational_speed'
  48. with multiprocessing.Pool(split_count) as pool:
  49. file_datas = pool.starmap(self.get_data_exec, [(base_param_exec, i) for i in all_files])
  50. print("读取文件耗时:", datetime.datetime.now() - self.begin)
  51. result_list = list()
  52. for file_data in file_datas:
  53. wind_turbine_name, time_stamp, sampling_frequency, rotational_speed, mesure_point_name, mesure_data = \
  54. file_data[0], file_data[1], file_data[2], file_data[3], file_data[4], file_data[5]
  55. if mesure_point_name in map_dict.keys():
  56. result_list.append(
  57. [wind_turbine_name, time_stamp, sampling_frequency, 'rotational_speed', [float(rotational_speed)]])
  58. result_list.append(
  59. [wind_turbine_name, time_stamp, sampling_frequency, mesure_point_name, mesure_data])
  60. df = pd.DataFrame(result_list,
  61. columns=['wind_turbine_name', 'time_stamp', 'sampling_frequency', 'mesure_point_name',
  62. 'mesure_data'])
  63. df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
  64. df['mesure_point_name'] = df['mesure_point_name'].map(map_dict)
  65. df.dropna(subset=['mesure_point_name'], inplace=True)
  66. df['wind_turbine_number'] = df['wind_turbine_name'].map(all_wind).fillna(df['wind_turbine_name'])
  67. df['mesure_data'] = df['mesure_data'].apply(lambda x: json.dumps(x))
  68. df.sort_values(by=['time_stamp', 'mesure_point_name'], inplace=True)
  69. self.del_exists_data(df)
  70. save_df_to_db(self.field_code + '_wave', df, batch_count=1000)
  71. print("总耗时:", datetime.datetime.now() - self.begin)