WaveTrans.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import datetime
  2. import json
  3. import multiprocessing
  4. from os.path import basename, dirname
  5. import pandas as pd
  6. from service.trans_service import get_wave_conf, save_file_to_db, save_df_to_db
  7. from utils.file.trans_methods import *
  8. from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
  9. class WaveTrans(object):
  10. def __init__(self, field_code, read_path, save_path: str):
  11. self.field_code = field_code
  12. self.read_path = read_path
  13. self.save_path = save_path
  14. self.begin = datetime.datetime.now()
  15. # def get_data(self, file_path):
  16. # df = pd.read_csv(file_path, encoding=detect_file_encoding(file_path), header=None)
  17. # data = [i for i in df[0].values]
  18. # filename = os.path.basename(file_path)
  19. # wind_num = filename.split('_')[1]
  20. # cedian = '齿轮箱' + filename.split('_齿轮箱')[1].split('_Time')[0]
  21. # cedian_time = filename.split('风机_')[1].split('_齿轮箱')[0].replace('_', ':')
  22. # name_tmp = 'Time_' + filename.split('Time_')[1].split('_cms')[0]
  23. # pinlv = name_tmp[0:name_tmp.rfind('_')]
  24. # zhuansu = name_tmp[name_tmp.rfind('_') + 1:]
  25. #
  26. # df = pd.DataFrame()
  27. # df['风机编号'] = [wind_num, wind_num]
  28. # df['时间'] = [cedian_time, cedian_time]
  29. # df['频率'] = [pinlv, pinlv]
  30. # df['测点'] = ['转速', cedian]
  31. # df['数据'] = [[float(zhuansu)], data]
  32. #
  33. # return df
  34. def get_data_exec(self, func_code, arg):
  35. exec(func_code)
  36. return locals()['get_data'](arg)
  37. def run(self, map_dict=dict()):
  38. all_files = read_files(self.read_path, ['csv'])
  39. print(len)
  40. # 最大取系统cpu的 1/2
  41. split_count = get_available_cpu_count_with_percent(1 / 2)
  42. wave_conf = get_wave_conf(self.field_code)
  43. base_param_exec = wave_conf['base_param_exec']
  44. map_dict = {}
  45. if base_param_exec:
  46. base_param_exec = base_param_exec.replace('\r\n', '\n').replace('\t', ' ')
  47. print(base_param_exec)
  48. # exec(base_param_exec)
  49. mesure_poins = [key for key, value in wave_conf.items() if str(key).startswith('conf_') and value]
  50. for point in mesure_poins:
  51. map_dict[wave_conf[point]] = point
  52. with multiprocessing.Pool(split_count) as pool:
  53. file_datas = pool.starmap(self.get_data_exec, [(base_param_exec, i) for i in all_files])
  54. # for file_data in file_datas:
  55. # wind_num, data_time, frequency, rotational_speed, measurementp_name, data = file_data[0], file_data[1], \
  56. # file_data[2], file_data[3], file_data[4],
  57. result_list = list()
  58. for file_data in file_datas:
  59. wind_turbine_name, time_stamp, sampling_frequency, rotational_speed, mesure_point_name, mesure_data = \
  60. file_data[0], file_data[1], file_data[2], file_data[3], file_data[4], file_data[5]
  61. result_list.append(
  62. [wind_turbine_name, time_stamp, sampling_frequency, 'rotational_speed', [float(rotational_speed)]])
  63. result_list.append(
  64. [wind_turbine_name, time_stamp, sampling_frequency, mesure_point_name, mesure_data])
  65. df = pd.DataFrame(result_list,
  66. columns=['wind_turbine_name', 'time_stamp', 'sampling_frequency', 'mesure_point_name',
  67. 'mesure_data'])
  68. df['mesure_point_name'] = df['mesure_point_name'].map(map_dict).fillna(df['mesure_point_name'])
  69. df['mesure_data'] = df['mesure_data'].apply(lambda x: json.dumps(x))
  70. save_df_to_db('SKF001_wave', df, batch_count=1000)
  71. print("总耗时:", datetime.datetime.now() - self.begin)