|
@@ -1,13 +1,15 @@
|
|
import datetime
|
|
import datetime
|
|
|
|
+import json
|
|
import multiprocessing
|
|
import multiprocessing
|
|
import os.path
|
|
import os.path
|
|
|
|
|
|
|
|
+import numpy as np
|
|
import pandas as pd
|
|
import pandas as pd
|
|
|
|
|
|
from service.plt_service import get_all_wind
|
|
from service.plt_service import get_all_wind
|
|
from service.trans_service import save_df_to_db
|
|
from service.trans_service import save_df_to_db
|
|
from utils.file.trans_methods import read_files, read_file_to_df
|
|
from utils.file.trans_methods import read_files, read_file_to_df
|
|
-from utils.log.trans_log import set_trance_id
|
|
|
|
|
|
+from utils.log.trans_log import set_trance_id, trans_print
|
|
|
|
|
|
|
|
|
|
class LaserTrans():
|
|
class LaserTrans():
|
|
@@ -15,10 +17,9 @@ class LaserTrans():
|
|
激光测距仪转化
|
|
激光测距仪转化
|
|
"""
|
|
"""
|
|
|
|
|
|
- def __init__(self, field_code, read_path, save_path: str):
|
|
|
|
|
|
+ def __init__(self, field_code, read_path):
|
|
self.field_code = field_code
|
|
self.field_code = field_code
|
|
self.read_path = read_path
|
|
self.read_path = read_path
|
|
- self.save_path = save_path
|
|
|
|
self.begin = datetime.datetime.now()
|
|
self.begin = datetime.datetime.now()
|
|
self.wind_col_trans, _ = get_all_wind(self.field_code, need_rated_param=False)
|
|
self.wind_col_trans, _ = get_all_wind(self.field_code, need_rated_param=False)
|
|
|
|
|
|
@@ -26,29 +27,51 @@ class LaserTrans():
|
|
file_name = os.path.basename(file_path)
|
|
file_name = os.path.basename(file_path)
|
|
wind_farm, wind_turbine_number, acquisition_time, sampling_frequency = file_name.split("_")
|
|
wind_farm, wind_turbine_number, acquisition_time, sampling_frequency = file_name.split("_")
|
|
result_df = pd.DataFrame()
|
|
result_df = pd.DataFrame()
|
|
- result_df['wind_turbine_number'] = wind_turbine_number
|
|
|
|
- result_df['acquisition_time'] = pd.to_datetime(acquisition_time, format='%Y%m%d%H%M%S')
|
|
|
|
- result_df['sampling_frequency'] = sampling_frequency
|
|
|
|
|
|
+ result_df['wind_turbine_number'] = [wind_turbine_number]
|
|
|
|
+ result_df['acquisition_time'] = [pd.to_datetime(acquisition_time, format='%Y%m%d%H%M%S')]
|
|
|
|
+ result_df['sampling_frequency'] = [sampling_frequency]
|
|
result_df['wind_turbine_number'] = result_df['wind_turbine_number'].map(self.wind_col_trans).fillna(
|
|
result_df['wind_turbine_number'] = result_df['wind_turbine_number'].map(self.wind_col_trans).fillna(
|
|
result_df['wind_turbine_number'])
|
|
result_df['wind_turbine_number'])
|
|
# 获取数据
|
|
# 获取数据
|
|
df = read_file_to_df(file_path)
|
|
df = read_file_to_df(file_path)
|
|
- result_df['pk_no'] = df['PkNo'].values[0]
|
|
|
|
- result_df['echo_type'] = df['EchoType'].values[0]
|
|
|
|
- result_df['echo1_dist'] = df['Echo1Dist'].values
|
|
|
|
- result_df['echo1_grey'] = df['Echo1Grey'].values
|
|
|
|
- result_df['echo2_dist'] = df['Echo2Dist'].values
|
|
|
|
- result_df['echo2_grey'] = df['Echo2Grey'].values
|
|
|
|
- result_df['echo3_dist'] = df['Echo3Dist'].values
|
|
|
|
- result_df['echo3_grey'] = df['Echo3Grey'].values
|
|
|
|
|
|
+ if not df.empty:
|
|
|
|
+ result_df['pk_no'] = [df['PkNo'].values[0]]
|
|
|
|
+ result_df['echo_type'] = [df['EchoType'].values[0]]
|
|
|
|
+ result_df['echo1_dist'] = [json.dumps([float(i) for i in df['Echo1Dist'].values if not np.isnan(i)])]
|
|
|
|
+ result_df['echo1_grey'] = [json.dumps([int(i) for i in df['Echo1Grey'].values if not np.isnan(i)])]
|
|
|
|
+ result_df['echo2_dist'] = [json.dumps([float(i) for i in df['Echo2Dist'].values if not np.isnan(i)])]
|
|
|
|
+ result_df['echo2_grey'] = [json.dumps([int(i) for i in df['Echo2Grey'].values if not np.isnan(i)])]
|
|
|
|
+ result_df['echo3_dist'] = [json.dumps([float(i) for i in df['Echo3Dist'].values if not np.isnan(i)])]
|
|
|
|
+ result_df['echo3_grey'] = [json.dumps([int(i) for i in df['Echo3Grey'].values if not np.isnan(i)])]
|
|
|
|
+ else:
|
|
|
|
+ return pd.DataFrame()
|
|
|
|
|
|
- save_df_to_db(self.field_code + "_laser", result_df)
|
|
|
|
|
|
+ return result_df
|
|
|
|
|
|
def run(self):
|
|
def run(self):
|
|
trance_id = '-'.join([self.field_code, 'laser'])
|
|
trance_id = '-'.join([self.field_code, 'laser'])
|
|
set_trance_id(trance_id)
|
|
set_trance_id(trance_id)
|
|
all_files = read_files(self.read_path, ['csv'])
|
|
all_files = read_files(self.read_path, ['csv'])
|
|
|
|
+ trans_print(self.field_code, '获取文件总数为:', len(all_files))
|
|
pool_count = 8 if len(all_files) > 8 else len(all_files)
|
|
pool_count = 8 if len(all_files) > 8 else len(all_files)
|
|
|
|
|
|
with multiprocessing.Pool(pool_count) as pool:
|
|
with multiprocessing.Pool(pool_count) as pool:
|
|
- pool.map(self.get_file_data, all_files)
|
|
|
|
|
|
+ dfs = pool.map(self.get_file_data, all_files)
|
|
|
|
+ df = pd.concat(dfs, ignore_index=True)
|
|
|
|
+ save_df_to_db(self.field_code + "_laser", df)
|
|
|
|
+ df.sort_values(by=['acquisition_time'], inplace=True)
|
|
|
|
+ trans_print(self.field_code, '执行结束,总耗时:', (datetime.datetime.now() - self.begin))
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
+ import sys
|
|
|
|
+ from os import path, environ
|
|
|
|
+
|
|
|
|
+ env = 'dev'
|
|
|
|
+ if len(sys.argv) >= 2:
|
|
|
|
+ env = sys.argv[1]
|
|
|
|
+
|
|
|
|
+ conf_path = path.abspath(__file__).split("energy-data-trans")[0] + f"/energy-data-trans/conf/etl_config_{env}.yaml"
|
|
|
|
+ environ['ETL_CONF'] = conf_path
|
|
|
|
+ environ['env'] = env
|
|
|
|
+ LaserTrans('JGCS001', r'D:\data\激光\测试').run()
|