LaserTrans.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import datetime
  2. import json
  3. import multiprocessing
  4. import os.path
  5. import numpy as np
  6. import pandas as pd
  7. from service.plt_service import get_all_wind
  8. from service.trans_service import save_df_to_db
  9. from utils.file.trans_methods import read_files, read_file_to_df
  10. from utils.log.trans_log import set_trance_id, trans_print
  11. class LaserTrans():
  12. """
  13. 激光测距仪转化
  14. """
  15. def __init__(self, field_code, read_path):
  16. self.field_code = field_code
  17. self.read_path = read_path
  18. self.begin = datetime.datetime.now()
  19. self.wind_col_trans, _ = get_all_wind(self.field_code, need_rated_param=False)
  20. def get_file_data(self, file_path):
  21. file_name = os.path.basename(file_path)
  22. wind_farm, wind_turbine_number, acquisition_time, sampling_frequency = file_name.split("_")
  23. result_df = pd.DataFrame()
  24. result_df['wind_turbine_number'] = [wind_turbine_number]
  25. result_df['acquisition_time'] = [pd.to_datetime(acquisition_time, format='%Y%m%d%H%M%S')]
  26. result_df['sampling_frequency'] = [sampling_frequency]
  27. result_df['wind_turbine_number'] = result_df['wind_turbine_number'].map(self.wind_col_trans).fillna(
  28. result_df['wind_turbine_number'])
  29. # 获取数据
  30. df = read_file_to_df(file_path)
  31. if not df.empty:
  32. result_df['pk_no'] = [df['PkNo'].values[0]]
  33. result_df['echo_type'] = [df['EchoType'].values[0]]
  34. result_df['echo1_dist'] = [json.dumps([float(i) for i in df['Echo1Dist'].values if not np.isnan(i)])]
  35. result_df['echo1_grey'] = [json.dumps([int(i) for i in df['Echo1Grey'].values if not np.isnan(i)])]
  36. result_df['echo2_dist'] = [json.dumps([float(i) for i in df['Echo2Dist'].values if not np.isnan(i)])]
  37. result_df['echo2_grey'] = [json.dumps([int(i) for i in df['Echo2Grey'].values if not np.isnan(i)])]
  38. result_df['echo3_dist'] = [json.dumps([float(i) for i in df['Echo3Dist'].values if not np.isnan(i)])]
  39. result_df['echo3_grey'] = [json.dumps([int(i) for i in df['Echo3Grey'].values if not np.isnan(i)])]
  40. else:
  41. return pd.DataFrame()
  42. return result_df
  43. def run(self):
  44. trance_id = '-'.join([self.field_code, 'laser'])
  45. set_trance_id(trance_id)
  46. all_files = read_files(self.read_path, ['csv'])
  47. trans_print(self.field_code, '获取文件总数为:', len(all_files))
  48. pool_count = 8 if len(all_files) > 8 else len(all_files)
  49. with multiprocessing.Pool(pool_count) as pool:
  50. dfs = pool.map(self.get_file_data, all_files)
  51. df = pd.concat(dfs, ignore_index=True)
  52. save_df_to_db(self.field_code + "_laser", df)
  53. df.sort_values(by=['acquisition_time'], inplace=True)
  54. trans_print(self.field_code, '执行结束,总耗时:', (datetime.datetime.now() - self.begin))
  55. if __name__ == '__main__':
  56. import sys
  57. from os import path, environ
  58. env = 'dev'
  59. if len(sys.argv) >= 2:
  60. env = sys.argv[1]
  61. conf_path = path.abspath(__file__).split("energy-data-trans")[0] + f"/energy-data-trans/conf/etl_config_{env}.yaml"
  62. environ['ETL_CONF'] = conf_path
  63. environ['env'] = env
  64. LaserTrans('JGCS001', r'D:\data\激光\测试').run()