Selaa lähdekoodia

添加秒级转分钟级的逻辑
添加skf数据提取

wzl 8 kuukautta sitten
vanhempi
commit
17b8a65398

+ 1 - 1
conf/etl_config_prod.yaml

@@ -23,4 +23,4 @@ log_path_dir: /data/logs
 # 临时文件存放处,有些甲方公司隔得tmp太小,只好自己配置
 tmp_base_path: /tmp
 
-run_batch_count: 2
+run_batch_count: 3

+ 14 - 6
etl/common/BaseDataTrans.py

@@ -27,6 +27,9 @@ class BaseDataTrans(object):
         self.batch_count = 100000
         self.save_db = save_db
         self.filed_conf = self.get_filed_conf()
+
+        # trans_print("是否是秒转分钟:", self.boolean_sec_to_min)
+
         try:
             self.pathsAndTable = PathsAndTable(self.batch_no, self.batch_name, self.read_path, self.field_name,
                                                self.read_type, save_db, self.save_zip)
@@ -79,31 +82,36 @@ class BaseDataTrans(object):
                 begin = datetime.datetime.now()
                 trans_print("开始清理数据,临时文件夹:", self.pathsAndTable.get_tmp_path())
                 self.clean_file_and_db()
-                trans_print("清理数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
+                trans_print("清理数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
+                            datetime.datetime.now() - total_begin)
 
             if self.step <= 1 and self.end >= 1:
                 begin = datetime.datetime.now()
                 trans_print("开始解压移动文件")
                 self.unzip_or_remove_to_tmp_dir()
-                trans_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
+                trans_print("解压移动文件结束:耗时:", datetime.datetime.now() - begin, "总耗时:",
+                            datetime.datetime.now() - total_begin)
 
             if self.step <= 2 and self.end >= 2:
                 begin = datetime.datetime.now()
                 trans_print("开始保存数据到临时文件")
                 self.read_and_save_tmp_file()
-                trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
+                trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
+                            datetime.datetime.now() - total_begin)
 
             if self.step <= 3 and self.end >= 3:
                 begin = datetime.datetime.now()
                 trans_print("开始保存数据到正式文件")
                 self.statistics_and_save_to_file()
-                trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
+                trans_print("保存数据到正式文件结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
+                            datetime.datetime.now() - total_begin)
 
             if self.step <= 4 and self.end >= 4:
                 begin = datetime.datetime.now()
                 trans_print("开始保存到数据库,是否存库:", self.pathsAndTable.save_db)
                 self.save_to_db()
-                trans_print("保存到数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:", datetime.datetime.now() - begin)
+                trans_print("保存到数据结束,耗时:", datetime.datetime.now() - begin, "总耗时:",
+                            datetime.datetime.now() - total_begin)
 
             self.update_exec_progress()
         except Exception as e:
@@ -111,7 +119,7 @@ class BaseDataTrans(object):
             update_trans_status_error(self.batch_no, self.read_type, str(e), self.save_db)
             raise e
         finally:
-            # self.pathsAndTable.delete_tmp_files()
+            self.pathsAndTable.delete_tmp_files()
             trans_print("执行结束,总耗时:", str(datetime.datetime.now() - total_begin))
 
 

+ 3 - 2
etl/common/UnzipAndRemove.py

@@ -11,8 +11,9 @@ from utils.zip.unzip import unzip, unrar, get_desc_path
 
 
 class UnzipAndRemove(object):
-    def __init__(self, pathsAndTable: PathsAndTable):
+    def __init__(self, pathsAndTable: PathsAndTable, filter_types=None):
         self.pathsAndTable = pathsAndTable
+        self.filter_types = filter_types
 
     def get_and_remove(self, file):
 
@@ -46,7 +47,7 @@ class UnzipAndRemove(object):
                 all_files = read_files(self.pathsAndTable.read_path)
 
             # 最大取系统cpu的 三分之二
-            split_count = get_available_cpu_count_with_percent(1/2)
+            split_count = get_available_cpu_count_with_percent(1 / 2)
             all_arrays = split_array(all_files, split_count)
 
             for index, arr in enumerate(all_arrays):

+ 7 - 1
etl/wind_power/min_sec/MinSecTrans.py

@@ -43,6 +43,11 @@ class MinSecTrans(BaseDataTrans):
             vertical_value = read_conf(conf_map, 'vertical_col_value')
             need_valid_cols = not merge_columns
 
+            boolean_sec_to_min = read_conf(conf_map, 'boolean_sec_to_min', 0)
+            boolean_sec_to_min = int(boolean_sec_to_min) == 1
+
+            # self.boolean_sec_to_min = int(data['boolean_sec_to_min']) == 1 if 'boolean_sec_to_min' in data.keys() else False
+
             cols_trans_all = dict()
             trans_cols = ['wind_turbine_number', 'time_stamp', 'active_power', 'rotor_speed', 'generator_speed',
                           'wind_velocity', 'pitch_angle_blade_1', 'pitch_angle_blade_2', 'pitch_angle_blade_3',
@@ -71,7 +76,8 @@ class MinSecTrans(BaseDataTrans):
                               wind_name_exec=wind_name_exec, is_vertical_table=is_vertical_table,
                               vertical_cols=vertical_cols, vertical_key=vertical_key,
                               vertical_value=vertical_value, index_cols=index_cols, merge_columns=merge_columns,
-                              resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols)
+                              resolve_col_prefix=resolve_col_prefix, need_valid_cols=need_valid_cols,
+                              boolean_sec_to_min=boolean_sec_to_min)
 
     # 第三步 读取 并 保存到临时文件
     def read_and_save_tmp_file(self):

+ 7 - 0
etl/wind_power/min_sec/StatisticsAndSaveFile.py

@@ -100,6 +100,13 @@ class StatisticsAndSaveFile(object):
         df.sort_values(by='time_stamp', inplace=True)
         df = df[[i for i in self.trans_param.cols_tran.keys() if i in df.columns]]
 
+        # 如果秒级有可能合并到分钟级
+        # TODO add 秒转分钟
+        if self.trans_param.boolean_sec_to_min:
+            df['time_stamp'] = df['time_stamp'].apply(lambda x: x + pd.Timedelta(minutes=(10 - x.minute % 10) % 10))
+            df['time_stamp'] = df['time_stamp'].dt.floor('10T')
+            df = df.groupby(['wind_turbine_number', 'time_stamp']).mean().reset_index()
+
         power = df.sample(int(df.shape[0] / 100))['active_power'].median()
         if power > 10000:
             df['active_power'] = df['active_power'] / 1000

+ 2 - 1
etl/wind_power/min_sec/TransParam.py

@@ -8,7 +8,7 @@ class TransParam(object):
     def __init__(self, read_type=None, read_path=None, cols_tran=dict(),
                  wind_name_exec=str(), is_vertical_table=False, vertical_cols=list(), vertical_key=None,
                  vertical_value=None, index_cols=list(), merge_columns=False, resolve_col_prefix=None,
-                 need_valid_cols=True, wind_col_trans: dict = None):
+                 need_valid_cols=True, wind_col_trans: dict = None, boolean_sec_to_min=False):
         self.read_type = read_type
         self.read_path = read_path
         self.cols_tran = cols_tran
@@ -22,3 +22,4 @@ class TransParam(object):
         self.resolve_col_prefix = resolve_col_prefix
         self.need_valid_cols = need_valid_cols
         self.wind_col_trans = wind_col_trans
+        self.boolean_sec_to_min = boolean_sec_to_min

+ 48 - 0
etl/wind_power/wave/WaveTrans.py

@@ -0,0 +1,48 @@
+import multiprocessing
+import os.path
+
+from utils.file.trans_methods import *
+from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
+
+
+class WaveTrans(object):
+
+    def __init__(self, field_code, read_path, save_path: str):
+        self.field_code = field_code
+        self.read_path = read_path
+        self.save_path = save_path
+
+    def get_data(self, file_path):
+        df = pd.read_csv(file_path, encoding=detect_file_encoding(file_path), header=None)
+        data = [i for i in df[0].values]
+        filename = os.path.basename(file_path)
+        wind_num = filename.split('_')[1]
+        cedian = '齿轮箱' + filename.split('_齿轮箱')[1].split('_Time')[0]
+        cedian_time = filename.split('风机_')[1].split('_齿轮箱')[0].replace('_', ':')
+        name_tmp = 'Time_' + filename.split('Time_')[1].split('_cms')[0]
+        pinlv = name_tmp[0:name_tmp.rfind('_')]
+        zhuansu = name_tmp[name_tmp.rfind('_') + 1:]
+
+        df = pd.DataFrame()
+        df['风机编号'] = [wind_num, wind_num]
+        df['时间'] = [cedian_time, cedian_time]
+        df['频率'] = [pinlv, pinlv]
+        df['测点'] = ['转速', cedian]
+        df['数据'] = [[float(zhuansu)], data]
+
+        return df
+
+    def run(self):
+        all_files = read_files(self.read_path, ['csv'])
+
+        # 最大取系统cpu的 1/2
+        split_count = get_available_cpu_count_with_percent(1 / 2)
+
+        with multiprocessing.Pool(split_count) as pool:
+            dfs = pool.starmap(self.get_data, [(i,) for i in all_files])
+
+        df = pd.concat(dfs, ignore_index=True, copy=False)
+
+        df.drop_duplicates(subset=['风机编号', '时间', '频率', '测点'], keep='last')
+
+        df.to_csv(os.path.join(self.save_path, self.field_code + '.csv'), index=False, encoding='utf8')

+ 0 - 0
etl/wind_power/wave/__init__.py


+ 7 - 0
service/trans_service.py

@@ -40,6 +40,13 @@ def get_fault_warn_conf(field_code, trans_type) -> dict:
         return None
     return res[0]
 
+def get_wave_conf(field_code) -> dict:
+    query_sql = "SELECT * FROM wave_conf where wind_code = %s and status = 1"
+    res = trans.execute(query_sql, (field_code))
+    print(res)
+    if type(res) == tuple:
+        return None
+    return res[0]
 
 def creat_min_sec_table(table_name, win_names, read_type):
     create_sql = f"""

+ 10 - 0
tmp_file/test_wave.py

@@ -0,0 +1,10 @@
+import os
+import sys
+
+sys.path.insert(0, os.path.abspath(__file__).split("tmp_file")[0])
+
+from etl.wind_power.wave.WaveTrans import WaveTrans
+
+if __name__ == '__main__':
+    test = WaveTrans('振动测点', r'/home/wzl/test_data/sdk_data/sdk_data', r'/home/wzl/test_data/sdk_data')
+    test.run()

+ 8 - 4
utils/file/trans_methods.py

@@ -161,22 +161,26 @@ def __build_directory_dict(directory_dict, path, filter_types=None):
 
 
 # 读取路径下所有的excel文件
-def read_excel_files(read_path):
+def read_excel_files(read_path, filter_types=None):
+    if filter_types is None:
+        filter_types = ['xls', 'xlsx', 'csv', 'gz']
     if os.path.isfile(read_path):
         return [read_path]
 
     directory_dict = {}
-    __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
+    __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
 
     return [path for paths in directory_dict.values() for path in paths if path]
 
 
 # 读取路径下所有的文件
-def read_files(read_path):
+def read_files(read_path, filter_types=None):
+    if filter_types is None:
+        filter_types = ['xls', 'xlsx', 'csv', 'gz', 'zip', 'rar']
     if os.path.isfile(read_path):
         return [read_path]
     directory_dict = {}
-    __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz', 'zip', 'rar'])
+    __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
 
     return [path for paths in directory_dict.values() for path in paths if path]