소스 검색

抽取振动数据-添加无映射测点清除

wzl 6 달 전
부모
커밋
636a8f2f9f
4개의 변경된 파일96개의 추가작업 그리고 62개의 파일을 삭제
  1. 40 36
      etl/wind_power/wave/WaveTrans.py
  2. 6 3
      service/plt_service.py
  3. 48 1
      service/trans_service.py
  4. 2 22
      tmp_file/test_wave.py

+ 40 - 36
etl/wind_power/wave/WaveTrans.py

@@ -2,10 +2,11 @@ import datetime
 import json
 import multiprocessing
 from os.path import basename, dirname
-
 import pandas as pd
 
-from service.trans_service import get_wave_conf, save_file_to_db, save_df_to_db
+from service.plt_service import get_all_wind
+from service.trans_service import get_wave_conf, save_df_to_db, get_or_create_wave_table, \
+    get_wave_data, delete_exist_wave_data
 from utils.file.trans_methods import *
 from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
 
@@ -18,36 +19,31 @@ class WaveTrans(object):
         self.save_path = save_path
         self.begin = datetime.datetime.now()
 
-    # def get_data(self, file_path):
-    #     df = pd.read_csv(file_path, encoding=detect_file_encoding(file_path), header=None)
-    #     data = [i for i in df[0].values]
-    #     filename = os.path.basename(file_path)
-    #     wind_num = filename.split('_')[1]
-    #     cedian = '齿轮箱' + filename.split('_齿轮箱')[1].split('_Time')[0]
-    #     cedian_time = filename.split('风机_')[1].split('_齿轮箱')[0].replace('_', ':')
-    #     name_tmp = 'Time_' + filename.split('Time_')[1].split('_cms')[0]
-    #     pinlv = name_tmp[0:name_tmp.rfind('_')]
-    #     zhuansu = name_tmp[name_tmp.rfind('_') + 1:]
-    #
-    #     df = pd.DataFrame()
-    #     df['风机编号'] = [wind_num, wind_num]
-    #     df['时间'] = [cedian_time, cedian_time]
-    #     df['频率'] = [pinlv, pinlv]
-    #     df['测点'] = ['转速', cedian]
-    #     df['数据'] = [[float(zhuansu)], data]
-    #
-    #     return df
-
     def get_data_exec(self, func_code, arg):
         exec(func_code)
         return locals()['get_data'](arg)
 
-    def run(self, map_dict=dict()):
+    def del_exists_data(self, df):
+        min_date, max_date = df['time_stamp'].min(), df['time_stamp'].max()
+        db_df = get_wave_data(self.field_code + '_wave', min_date, max_date)
+
+        exists_df = pd.merge(db_df, df,
+                             on=['wind_turbine_name', 'time_stamp', 'sampling_frequency', 'mesure_point_name'],
+                             how='inner')
+        ids = [int(i) for i in exists_df['id'].to_list()]
+        if ids:
+            delete_exist_wave_data(self.field_code + "_wave", ids)
+
+    def run(self):
         all_files = read_files(self.read_path, ['csv'])
         print(len)
         # 最大取系统cpu的 1/2
         split_count = get_available_cpu_count_with_percent(1 / 2)
 
+        all_wind, _ = get_all_wind(self.field_code, False)
+
+        get_or_create_wave_table(self.field_code + '_wave')
+
         wave_conf = get_wave_conf(self.field_code)
 
         base_param_exec = wave_conf['base_param_exec']
@@ -55,39 +51,47 @@ class WaveTrans(object):
         if base_param_exec:
             base_param_exec = base_param_exec.replace('\r\n', '\n').replace('\t', '    ')
             print(base_param_exec)
-            # exec(base_param_exec)
+            if 'import ' in base_param_exec:
+                raise Exception("方法不支持import方法")
 
-            mesure_poins = [key for key, value in wave_conf.items() if str(key).startswith('conf_') and value]
+        mesure_poins = [key for key, value in wave_conf.items() if str(key).startswith('conf_') and value]
+        for point in mesure_poins:
+            map_dict[wave_conf[point]] = point.replace('conf_', '')
 
-            for point in mesure_poins:
-                map_dict[wave_conf[point]] = point
+        map_dict['rotational_speed'] = 'rotational_speed'
 
         with multiprocessing.Pool(split_count) as pool:
             file_datas = pool.starmap(self.get_data_exec, [(base_param_exec, i) for i in all_files])
 
-        # for file_data in file_datas:
-        #     wind_num, data_time, frequency, rotational_speed, measurementp_name, data = file_data[0], file_data[1], \
-        #     file_data[2], file_data[3], file_data[4],
+        print("读取文件耗时:", datetime.datetime.now() - self.begin)
 
         result_list = list()
         for file_data in file_datas:
             wind_turbine_name, time_stamp, sampling_frequency, rotational_speed, mesure_point_name, mesure_data = \
                 file_data[0], file_data[1], file_data[2], file_data[3], file_data[4], file_data[5]
 
-            result_list.append(
-                [wind_turbine_name, time_stamp, sampling_frequency, 'rotational_speed', [float(rotational_speed)]])
+            if mesure_point_name in map_dict.keys():
+                result_list.append(
+                    [wind_turbine_name, time_stamp, sampling_frequency, 'rotational_speed', [float(rotational_speed)]])
 
-            result_list.append(
-                [wind_turbine_name, time_stamp, sampling_frequency, mesure_point_name, mesure_data])
+                result_list.append(
+                    [wind_turbine_name, time_stamp, sampling_frequency, mesure_point_name, mesure_data])
 
         df = pd.DataFrame(result_list,
                           columns=['wind_turbine_name', 'time_stamp', 'sampling_frequency', 'mesure_point_name',
                                    'mesure_data'])
+        df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors='coerce')
+        df['mesure_point_name'] = df['mesure_point_name'].map(map_dict)
+        df.dropna(subset=['mesure_point_name'], inplace=True)
 
-        df['mesure_point_name'] = df['mesure_point_name'].map(map_dict).fillna(df['mesure_point_name'])
+        df['wind_turbine_number'] = df['wind_turbine_name'].map(all_wind).fillna(df['wind_turbine_name'])
 
         df['mesure_data'] = df['mesure_data'].apply(lambda x: json.dumps(x))
 
-        save_df_to_db('SKF001_wave', df, batch_count=1000)
+        df.sort_values(by=['time_stamp', 'mesure_point_name'], inplace=True)
+
+        self.del_exists_data(df)
+
+        save_df_to_db(self.field_code + '_wave', df, batch_count=1000)
 
         print("总耗时:", datetime.datetime.now() - self.begin)

+ 6 - 3
service/plt_service.py

@@ -137,7 +137,7 @@ def get_hebing_data_by_batch_no_and_type(batch_no, transfer_type):
     return data[0]
 
 
-def get_all_wind(field_code):
+def get_all_wind(field_code, need_rated_param=True):
     query_sql = """
     SELECT t.engine_code,t.engine_name,t.rated_capacity,a.rated_cut_out_windspeed 
     from wind_engine_group t LEFT JOIN wind_engine_mill a on t.mill_type_code  = a.mill_type_code 
@@ -148,7 +148,9 @@ def get_all_wind(field_code):
     power_result = dict()
     for data in dict_datas:
         wind_result[str(data['engine_name'])] = str(data['engine_code'])
-        power_result[str(data['engine_code'])] = (float(data['rated_capacity']), float(data['rated_cut_out_windspeed']))
+        if need_rated_param:
+            power_result[str(data['engine_code'])] = (
+            float(data['rated_capacity']), float(data['rated_cut_out_windspeed']))
     return wind_result, power_result
 
 
@@ -158,7 +160,8 @@ def get_all_wind_company():
     if datas:
         return [v for data in datas for k, v in data.items()]
     else:
-        return ['吉山风电场', '和风元宝山', '唐龙三期风电场', '密马风电场', '招远风电场', '昌平坳风场', '昌西一风电场', '虹梯官风电场', '长清风电场']
+        return ['吉山风电场', '和风元宝山', '唐龙三期风电场', '密马风电场', '招远风电场', '昌平坳风场', '昌西一风电场',
+                '虹梯官风电场', '长清风电场']
 
 
 def get_base_wind_and_power(wind_turbine_number):

+ 48 - 1
service/trans_service.py

@@ -7,6 +7,7 @@ import traceback
 import pandas as pd
 
 from utils.db.ConnectMysql import ConnectMysql
+from utils.file.trans_methods import split_array
 from utils.log.trans_log import trans_print
 
 trans = ConnectMysql("trans")
@@ -160,6 +161,7 @@ def clear_table(table_name, save_db=True):
         except:
             trans_print(traceback.format_exc())
 
+
 def save_file_to_db(table_name: str, file: str, batch_count=100000):
     base_name = os.path.basename(file)
     try:
@@ -219,6 +221,49 @@ def create_warn_fault_table(table_name):
     trans.execute(sql)
 
 
+def get_or_create_wave_table(table_name):
+    create_table = False
+    query_sql = f"select 1 from `{table_name}` limit 1"
+    try:
+        trans.execute(query_sql)
+    except:
+        create_table = True
+
+    if create_table:
+        sql = f"""
+        CREATE TABLE `{table_name}` (
+          `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
+          `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号',
+          `wind_turbine_name` varchar(20) DEFAULT NULL COMMENT '原始风机编号',
+          `time_stamp` datetime DEFAULT NULL COMMENT '时间',
+          `sampling_frequency` varchar(50) DEFAULT NULL COMMENT '分析频率',
+          `mesure_point_name` varchar(100) DEFAULT NULL COMMENT '测点名称',
+          `mesure_data` mediumtext COMMENT '测点数据',
+          PRIMARY KEY (`id`),
+          KEY `wind_turbine_number` (`wind_turbine_number`),
+          KEY `time_stamp` (`time_stamp`),
+          KEY `mesure_point_name` (`mesure_point_name`)
+        ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4
+        """
+
+        trans.execute(sql)
+
+
+def get_wave_data(table_name, min_data, max_data):
+    query_sql = f"""
+    select  id,wind_turbine_number,wind_turbine_name,time_stamp,sampling_frequency,mesure_point_name from `{table_name}` where time_stamp >= '{min_data}' and time_stamp <= '{max_data}'
+    """
+    return trans.read_sql_to_df(query_sql)
+
+
+def delete_exist_wave_data(table_name, ids):
+    all_arrays = split_array(ids, 1000)
+    for array in all_arrays:
+        ids_str = ",".join(['%s'] * len(array))
+        delete_sql = f"delete from `{table_name}` where id in ({ids_str})"
+        trans.execute(delete_sql, array)
+
+
 if __name__ == '__main__':
     # path_prix = r"/data/download/collection_data/2完成/招远风电场-山东-大唐/清理数据/WOF063100040-WOB00013/second"
     # files = ["WOG00030.csv", "WOG00034.csv"]
@@ -237,4 +282,6 @@ if __name__ == '__main__':
     # print(df.shape)
     # print(df.info())
     # print("Time used:", (end - begin).seconds)
-    get_fault_warn_conf("test", "fault")
+    # get_fault_warn_conf("test", "fault")
+
+    delete_exist_wave_data('SKF001_wave', [1, 2, 3])

+ 2 - 22
tmp_file/test_wave.py

@@ -1,32 +1,12 @@
 import os
 import sys
-import pandas as pd
-from os.path import basename, dirname
-
-from service.trans_service import get_wave_conf
 
 sys.path.insert(0, os.path.abspath(__file__).split("tmp_file")[0])
 
 from etl.wind_power.wave.WaveTrans import WaveTrans
 
 if __name__ == '__main__':
-    # test = WaveTrans('SKF001', r'/home/wzl/test_data/sdk_data/sdk_data', r'/home/wzl/test_data/sdk_data')
-    test = WaveTrans('SKF001', r'D:\data\sdk_data\sdk_data_less', r'/home/wzl/test_data/sdk_data')
-    # D:\data\sdk_data\sdk_data_less
-
-    # wave_conf = get_wave_conf(test.field_code)
-    #
-    # base_param_exec = wave_conf['base_param_exec']
-    # map_dict = {}
-    # if base_param_exec:
-    #     base_param_exec = base_param_exec.replace('\r\n', '\n').replace('\t', '    ')
-    #     print(base_param_exec)
-    #     exec(base_param_exec)
-    #     setattr(test, 'get_data', get_data)
-    #
-    #     mesure_poins = [key for key, value in wave_conf.items() if str(key).startswith('conf_') and value]
-    #
-    #     for point in mesure_poins:
-    #         map_dict[wave_conf[point]] = point
+    test = WaveTrans('SKF001', r'/home/wzl/test_data/sdk_data/sdk_data', r'/home/wzl/test_data/sdk_data')
+    # test = WaveTrans('SKF001', r'D:\data\sdk_data\sdk_data_less', r'/home/wzl/test_data/sdk_data')
 
     test.run()