import datetime import json import multiprocessing import os.path import numpy as np import pandas as pd from service.plt_service import get_all_wind from service.trans_service import save_df_to_db from service.trans_conf_service import update_trans_status_running, update_trans_transfer_progress, \ update_trans_status_success from utils.file.trans_methods import read_files, read_file_to_df from utils.log.trans_log import set_trance_id, trans_print class LaserTrans(): """ 激光测距仪转化 """ def __init__(self, id, wind_farm_code, read_path): self.id = id self.wind_farm_code = wind_farm_code self.read_path = read_path self.begin = datetime.datetime.now() self.wind_col_trans, _ = get_all_wind(self.wind_farm_code, need_rated_param=False) def get_file_data(self, file_path): file_name = os.path.basename(file_path) wind_farm, wind_turbine_number, acquisition_time, sampling_frequency = file_name.split("_") result_df = pd.DataFrame() result_df['wind_turbine_number'] = [wind_turbine_number] result_df['acquisition_time'] = [pd.to_datetime(acquisition_time, format='%Y%m%d%H%M%S')] result_df['sampling_frequency'] = [sampling_frequency] result_df['wind_turbine_number'] = result_df['wind_turbine_number'].map(self.wind_col_trans).fillna( result_df['wind_turbine_number']) # 获取数据 df = read_file_to_df(file_path) if not df.empty: result_df['pk_no'] = [df['PkNo'].values[0]] result_df['echo_type'] = [df['EchoType'].values[0]] result_df['echo1_dist'] = [json.dumps([float(i) for i in df['Echo1Dist'].values if not np.isnan(i)])] result_df['echo1_grey'] = [json.dumps([int(i) for i in df['Echo1Grey'].values if not np.isnan(i)])] result_df['echo2_dist'] = [json.dumps([float(i) for i in df['Echo2Dist'].values if not np.isnan(i)])] result_df['echo2_grey'] = [json.dumps([int(i) for i in df['Echo2Grey'].values if not np.isnan(i)])] result_df['echo3_dist'] = [json.dumps([float(i) for i in df['Echo3Dist'].values if not np.isnan(i)])] result_df['echo3_grey'] = [json.dumps([int(i) for i in df['Echo3Grey'].values if not np.isnan(i)])] else: return pd.DataFrame() return result_df def run(self): update_trans_status_running(self.id) trance_id = '-'.join([self.wind_farm_code, 'laser']) set_trance_id(trance_id) all_files = read_files(self.read_path, ['csv']) trans_print(self.wind_farm_code, '获取文件总数为:', len(all_files)) pool_count = 8 if len(all_files) > 8 else len(all_files) with multiprocessing.Pool(pool_count) as pool: dfs = pool.map(self.get_file_data, all_files) update_trans_transfer_progress(self.id, 80) df = pd.concat(dfs, ignore_index=True) update_trans_transfer_progress(self.id, 90) df.sort_values(by=['acquisition_time'], inplace=True) save_df_to_db(self.wind_farm_code + "_laser", df) update_trans_status_success(self.id, len(df['wind_turbine_number'].unique()), None, df['acquisition_time'].min(), df['acquisition_time'].max(), df.shape[0]) # update_trans_status_success(self.id) trans_print(self.wind_farm_code, '执行结束,总耗时:', (datetime.datetime.now() - self.begin)) if __name__ == '__main__': import sys from os import path, environ env = 'dev' if len(sys.argv) >= 2: env = sys.argv[1] conf_path = path.abspath(__file__).split("energy-data-trans")[0] + f"/energy-data-trans/conf/etl_config_{env}.yaml" environ['ETL_CONF'] = conf_path environ['env'] = env LaserTrans('JGCS001', r'D:\data\激光\测试').run()