Browse Source

添加震动的支持

wzl 5 tháng trước cách đây
mục cha
commit
1b51c9a90c

+ 2 - 1
app_run.py

@@ -26,6 +26,7 @@ def get_exec_data(batch_no=None, read_type=None, run_count=1):
 
 
 def run(batch_no=None, read_type=None, save_db=True, run_count=1):
+    update_timeout_trans_data()
     data = get_exec_data(batch_no, read_type, run_count)
 
     exec_process = None
@@ -53,7 +54,7 @@ if __name__ == '__main__':
     run_count = int(read_conf(yaml_config, "run_batch_count", 1))
 
     from utils.log.trans_log import trans_print
-    from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type
+    from service.plt_service import get_batch_exec_data, get_data_by_batch_no_and_type, update_timeout_trans_data
     from etl.wind_power.fault_warn.FaultWarnTrans import FaultWarnTrans
     from etl.wind_power.min_sec.MinSecTrans import MinSecTrans
 

+ 4 - 1
etl/wind_power/min_sec/MinSecTrans.py

@@ -3,6 +3,8 @@
 # @Author  : 魏志亮
 import multiprocessing
 
+import pandas as pd
+
 from etl.common.BaseDataTrans import BaseDataTrans
 from etl.wind_power.min_sec.ReadAndSaveTmp import ReadAndSaveTmp
 from etl.wind_power.min_sec.StatisticsAndSaveFile import StatisticsAndSaveFile
@@ -11,7 +13,8 @@ from service.plt_service import update_trans_status_success, update_trans_status
 from service.trans_service import batch_statistics, get_min_sec_conf
 from utils.conf.read_conf import read_conf
 from utils.df_utils.util import get_time_space
-from utils.file.trans_methods import *
+from utils.file.trans_methods import read_excel_files, read_file_to_df
+from utils.log.trans_log import trans_print
 
 
 class MinSecTrans(BaseDataTrans):

+ 7 - 9
etl/wind_power/min_sec/StatisticsAndSaveFile.py

@@ -5,7 +5,7 @@ from os import path
 
 import numpy as np
 import pandas as pd
-import math
+
 from etl.common.PathsAndTable import PathsAndTable
 from etl.wind_power.min_sec import TransParam
 from etl.wind_power.min_sec.ClassIdentifier import ClassIdentifier
@@ -17,6 +17,8 @@ from utils.file.trans_methods import create_file_path, read_excel_files, read_fi
 from utils.log.trans_log import trans_print
 from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
 
+exec("import math")
+
 
 class StatisticsAndSaveFile(object):
 
@@ -78,7 +80,7 @@ class StatisticsAndSaveFile(object):
 
         # 删除 有功功率 和 风速均为空的情况
         df.dropna(subset=['active_power', 'wind_velocity'], how='all', inplace=True)
-        trans_print(wind_col_name, "删除有功功率和风速均为空的情况:", df.shape)
+        trans_print(wind_col_name, "删除有功功率和风速均为空的情况:", df.shape)
         df.replace(np.nan, -999999999, inplace=True)
         number_cols = df.select_dtypes(include=['number']).columns.tolist()
         for col in df.columns:
@@ -92,11 +94,6 @@ class StatisticsAndSaveFile(object):
 
         df.drop_duplicates(['wind_turbine_number', 'time_stamp'], keep='first', inplace=True)
 
-        # 添加年月日
-        solve_time_begin = datetime.datetime.now()
-        # df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
-        # trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
-        # df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce", format='%d-%m-%Y %H:%M:%S')
         df['time_stamp'] = pd.to_datetime(df['time_stamp'], errors="coerce")
         df.dropna(subset=['time_stamp'], inplace=True)
         df.sort_values(by='time_stamp', inplace=True)
@@ -108,9 +105,11 @@ class StatisticsAndSaveFile(object):
             df['time_stamp'] = df['time_stamp'].apply(lambda x: x + pd.Timedelta(minutes=(10 - x.minute % 10) % 10))
             df['time_stamp'] = df['time_stamp'].dt.floor('10T')
             df = df.groupby(['wind_turbine_number', 'time_stamp']).mean().reset_index()
-
+        trans_print('有功功率前10个', df.head(10)['active_power'].values)
         power_df = df[df['active_power'] > 0]
+        trans_print(wind_col_name, "功率大于0的数量:", power_df.shape)
         power = power_df.sample(int(power_df.shape[0] / 100))['active_power'].median()
+
         del power_df
         trans_print(wind_col_name, '有功功率,中位数', power)
         if power > 100000:
@@ -133,7 +132,6 @@ class StatisticsAndSaveFile(object):
                                             rated_power=rated_power_and_cutout_speed_tuple[0],
                                             cut_out_speed=rated_power_and_cutout_speed_tuple[1])
         df = class_identifiler.run()
-
         df['year'] = df['time_stamp'].dt.year
         df['month'] = df['time_stamp'].dt.month
         df['day'] = df['time_stamp'].dt.day

+ 6 - 1
etl/wind_power/wave/WaveTrans.py

@@ -5,8 +5,11 @@ from service.plt_service import get_all_wind
 from service.trans_service import get_wave_conf, save_df_to_db, get_or_create_wave_table, \
     get_wave_data, delete_exist_wave_data
 from utils.file.trans_methods import *
+from utils.log.trans_log import set_trance_id
 from utils.systeminfo.sysinfo import get_available_cpu_count_with_percent
-from os.path import *
+
+exec("from os.path import *")
+
 
 class WaveTrans(object):
 
@@ -32,6 +35,8 @@ class WaveTrans(object):
             delete_exist_wave_data(self.field_code + "_wave", ids)
 
     def run(self):
+        trance_id = '-'.join([self.field_code, 'wave'])
+        set_trance_id(trance_id)
         all_files = read_files(self.read_path, ['csv'])
         print(len)
         # 最大取系统cpu的 1/2

+ 7 - 5
service/plt_service.py

@@ -3,9 +3,7 @@
 # @Author  : 魏志亮
 import datetime
 
-from utils.db.ConnectMysql import ConnectMysql
-
-plt = ConnectMysql("plt")
+from service.common_connect import plt
 
 
 def update_timeout_trans_data():
@@ -17,6 +15,10 @@ def update_timeout_trans_data():
             (transfer_type = 'second' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 24)  
             OR  
             (transfer_type = 'minute' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 6)  
+            OR  
+            (transfer_type = 'warn' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 6)  
+            OR  
+            (transfer_type = 'fault' AND TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 6)  
         )  
         AND trans_sys_status = 0
     """
@@ -150,7 +152,7 @@ def get_all_wind(field_code, need_rated_param=True):
         wind_result[str(data['engine_name'])] = str(data['engine_code'])
         if need_rated_param:
             power_result[str(data['engine_code'])] = (
-            float(data['rated_capacity']), float(data['rated_cut_out_windspeed']))
+                float(data['rated_capacity']), float(data['rated_cut_out_windspeed']))
     return wind_result, power_result
 
 
@@ -182,4 +184,4 @@ if __name__ == '__main__':
     # print(update_trans_status_success("test_唐龙-定时任务测试", "second", 10))
     begin = datetime.datetime.now()
 
-    print(get_all_wind('WOF091200006'))
+    print(get_all_wind('WOF034900024'))

+ 1 - 3
service/trans_service.py

@@ -6,11 +6,9 @@ from os import *
 
 import pandas as pd
 
-from utils.db.ConnectMysql import ConnectMysql
 from utils.file.trans_methods import split_array
 from utils.log.trans_log import trans_print
-
-trans = ConnectMysql("trans")
+from service.common_connect import trans
 
 
 def get_min_sec_conf(field_code, trans_type) -> dict:

+ 7 - 5
tmp_file/read_and_draw_png.py

@@ -9,19 +9,21 @@ from utils.file.trans_methods import read_file_to_df
 def draw(file, fengchang='测试'):
     name = path.basename(file).split('.')[0]
     df = read_file_to_df(file)
-
-    identifier = ClassIdentifier(wind_turbine_number='test', origin_df=df, rated_power=1500, cut_out_speed=25,
+    del df['lab']
+    identifier = ClassIdentifier(wind_turbine_number='test', origin_df=df, rated_power=5000, cut_out_speed=20,
                                  active_power='active_power', wind_velocity='wind_velocity',
-                                 pitch_angle_blade='pitch_angle_blade')
+                                 pitch_angle_blade='pitch_angle_blade_1')
     df = identifier.run()
 
     df.loc[df['active_power'] <= 0, 'lab'] = -1
 
+    print(df.groupby('lab').count())
     color_map = {-1: 'red', 0: 'green', 1: 'blue', 2: 'black', 3: 'orange', 4: 'magenta'}
     c = df['lab'].map(color_map)
 
     # -1:停机 0:好点  1:欠发功率点;2:超发功率点;3:额定风速以上的超发功率点 4: 限电
-    legend_map = {"停机": 'red', "好点": 'green', "欠发": 'blue', "超发": 'black', "额定风速以上的超发": 'orange', "限电": 'magenta'}
+    legend_map = {"停机": 'red', "好点": 'green', "欠发": 'blue', "超发": 'black', "额定风速以上的超发": 'orange',
+                  "限电": 'magenta'}
     scatter(name, x_label='风速', y_label='有功功率', x_values=df['wind_velocity'].values,
             y_values=df['active_power'].values, color=c, col_map=legend_map,
             save_file_path=path.dirname(
@@ -29,7 +31,7 @@ def draw(file, fengchang='测试'):
 
 
 if __name__ == '__main__':
-    read_dir = r"D:\data\清理数据\和风元宝山\test_11_test\test"
+    read_dir = r"D:\data\logs\matlib-test"
 
     files = [read_dir + sep + i for i in listdir(read_dir)]
 

+ 156 - 149
wind_farm/中广核/second_data.py

@@ -1,166 +1,173 @@
+import datetime
+import json
+import logging
+import multiprocessing
+import os
+import traceback
+
+import sys
+
+import numpy as np
 import pandas as pd
 from sqlalchemy import create_engine
-import datetime
 
 engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/appoint')
-mesurepoint_conf = {
-    0: '网侧L1相功率因数',
-    1: '网侧L1相电流',
-    2: '网侧L2相电流',
-    3: '网侧L3相电流',
-    4: '变频器水冷入口压力',
-    5: '变频器水冷出口压力',
-    6: '变频器水冷入口温度',
-    7: '变频器水冷出口温度',
-    8: '电网频率',
-    9: '网侧L1相电压',
-    10: '网侧L2相电压',
-    11: '网侧L3相电压',
-    12: '外测有功功率',
-    13: '发电机进风口前部温度',
-    14: '发电机压差传感器',
-    15: '发电机进风口后部温度',
-    16: '发电机出风口温度',
-    17: '风机有功功率',
-    18: '发电机水冷入口压力',
-    19: '发电机水冷出口压力',
-    20: '发电机无功功率',
-    21: '发电机转速',
-    22: '发电机水冷入口温度',
-    23: '发电机水冷出口温度',
-    24: '发电机驱动端轴承温度',
-    25: '发电机非驱动端轴承温度',
-    26: '发电机滑环室温度',
-    27: '发电机定子U相线圈温度',
-    28: '发电机定子V相线圈温度',
-    29: '发电机定子W相线圈温度',
-    30: '机舱水冷齿轮箱冷却液温度1min',
-    31: '机舱水冷发电机冷却液温度1min',
-    32: '机舱冷却系统水泵入口压力',
-    33: '机舱水冷泵出口压力1min',
-    34: '塔底水冷变频器入口水温',
-    35: '塔底水冷变频器出口水温',
-    36: '塔底湿度',
-    37: '塔底水冷三通阀设置实际值',
-    38: '塔底水冷三通阀反馈值',
-    39: '塔外湿度',
-    40: '塔底水冷泵入口压力',
-    41: '塔底水冷泵出口压力',
-    42: '舱内温度',
-    43: '机舱控制柜温度',
-    44: '舱外温度',
-    45: '塔底温度',
-    46: '塔底控制柜温度',
-    47: '风向',
-    48: '风速',
-    49: '机舱中轴线与风向夹角',
-    50: '变桨电池1电压',
-    51: '变桨电池2电压',
-    52: '变桨电池3电压',
-    53: '变桨电机1电流',
-    54: '变桨电机2电流',
-    55: '变桨电机3电流',
-    56: '1#桨电机温度',
-    57: '2#桨电机温度',
-    58: '3#桨电机温度',
-    59: '变桨驱动器1温度',
-    60: '变桨驱动器2温度',
-    61: '变桨驱动器3温度',
-    62: '轮毂内温度',
-    63: '变桨电池柜1温度',
-    64: '变桨电池柜2温度',
-    65: '变桨电池柜3温度',
-    66: '动力电缆温度监控1',
-    67: '动力电缆温度监控2',
-    68: '辅变绕组温度',
-    69: '主变绕组温度',
-    70: '齿轮箱入口油压',
-    71: '齿轮箱油路滤网前油压',
-    72: '液压站预充压力',
-    73: '主轴转速',
-    74: '齿轮箱油路入口温度',
-    75: '齿轮箱散热器出口温度',
-    76: '齿轮箱高速轴HSS_GSGS端轴承温度',
-    77: '齿轮箱高速轴HSS_GSRS端轴承温度',
-    78: '齿轮箱中速轴HSS_RS端轴承温度',
-    79: '齿轮箱中速轴IMS_GSRS端轴承温度',
-    80: '齿轮箱高速轴驱动端轴承温度',
-    81: '齿轮箱高速轴非驱动端轴承温度',
-    82: '齿轮箱油池温度',
-    83: '齿轮箱中速轴驱动端轴承温度',
-    84: '齿轮箱中速轴非驱动端轴承温度',
-    85: '主轴承温度',
-    86: '主轴承内圈温度',
-    87: '主轴承外圈温度',
-    88: '主控有功设置值',
-    89: '主控无功设置值',
-    90: '主控次要SC',
-    91: '主控有功限值上限',
-    92: '对外AI型状态1',
-    93: '机舱侧向振动(已滤波)',
-    94: '机舱轴向振动(已滤波)',
-    95: '后方摩擦片距离',
-    96: '前方摩擦片距离',
-    97: '机舱角度',
-    98: '机舱电流',
-    99: '扭揽角度',
-    100: '偏航电机转速1',
-    101: '偏航电机转速2',
-    102: '偏航角度'
-}
-
-common_address_conf = {
-    1: '风场1',
-    2: '风场2',
-    3: '风场3',
-    4: '风场4',
-    5: '风场5',
-    6: '风场6',
-    7: '风场7',
-    8: '风场8',
-    9: '风场9',
-    10: '风场10',
-    11: '风场11',
-    12: '风场12'
-}
-
-
-def pretty_print(message):
-    print(datetime.datetime.now(), ":", message)
+
+base_dir = r'/data/logs/104'
+save_dir = base_dir + os.sep + 'second'
+log_dir = base_dir + os.sep + 'logs' + os.sep + 'second'
+
+def create_dir(save_dir, is_file=False):
+    if is_file:
+        save_dir = os.path.dirname(save_dir)
+    os.makedirs(save_dir, exist_ok=True)
+
+
+def init_log():
+    logger = logging.getLogger("104data")
+    logger.setLevel(logging.INFO)
+    stout_handle = logging.StreamHandler(sys.stdout)
+    stout_handle.setFormatter(
+        logging.Formatter("%(asctime)s: %(message)s"))
+    stout_handle.setLevel(logging.INFO)
+    logger.addHandler(stout_handle)
+    create_dir(log_dir)
+    file_name = log_dir + os.sep + datetime.datetime.now().strftime('%Y%m') + '-info.log'
+    file_handler = logging.FileHandler(file_name, encoding='utf-8')
+    file_handler.setFormatter(
+        logging.Formatter("%(asctime)s: %(message)s"))
+    file_handler.setLevel(logging.INFO)
+    logger.addHandler(file_handler)
+
+    file_name = log_dir + os.sep + datetime.datetime.now().strftime('%Y%m') + '-error.log'
+    file_handler = logging.FileHandler(file_name, encoding='utf-8')
+    file_handler.setFormatter(
+        logging.Formatter("%(asctime)s: %(message)s"))
+    file_handler.setLevel(logging.ERROR)
+    logger.addHandler(file_handler)
+
+    return logger
+
+
+logger = init_log()
+
+
+def get_all_mesurement_conf():
+    sql = "select * from measurement_conf "
+    return pd.read_sql(sql, engine)
+
+
+def get_all_mesurepoint_conf():
+    sql = "select * from measurepoint_conf t where t.status = 1"
+    return pd.read_sql(sql, engine)
+
+
+def df_value_to_dict(df, key='col1', value='col2'):
+    """
+    :param df: dataframe
+    :param key: 字典的key,如果重复,则返回
+    :param value: 字典的value
+    :return:
+    """
+    result_dict = dict()
+    for k, v in zip(df[key], df[value]):
+        if k in result_dict.keys():
+            if type(result_dict[k]) == list:
+                result_dict[k].append(v)
+            else:
+                result_dict[k] = [result_dict[k]]
+                result_dict[k].append(v)
+        else:
+            result_dict[k] = v
+
+    return result_dict
+
+
+def info_print(*kwargs):
+    message = " ".join([str(i) for i in kwargs])
+    logger.info(message)
+
+
+def error_print(*kwargs):
+    message = " ".join([str(i) for i in kwargs])
+    logger.error(message)
 
 
 def exists_table(table_name):
     sql = f"SELECT * FROM information_schema.tables WHERE table_schema = 'appoint' AND table_name = '{table_name}'"
-    pretty_print(sql)
+    info_print(sql)
     table_df = pd.read_sql_query(sql, engine)
     if table_df.empty:
         return False
     return True
 
 
+def get_data_and_save_file(table_name, save_path, measurepoint_use_dict):
+    if not exists_table(table_name):
+        error_print(f"{table_name} 表不存在")
+    else:
+        df_sql = f"SELECT * FROM {table_name}"
+        info_print(df_sql)
+        df = pd.read_sql_query(df_sql, engine)
+        info_print(df.shape)
+
+        data_dict = dict()
+        for receive_time, information_object_data in zip(df['receive_time'],
+                                                         df['information_object_data']):
+
+            json_data = json.loads(information_object_data)
+            for k, v in json_data.items():
+                k = int(k)
+                wind_num = k // 103 + 1
+                mesurepoint_num = k % 103
+
+                if wind_num not in data_dict.keys():
+                    data_dict[wind_num] = dict()
+
+                if receive_time not in data_dict[wind_num].keys():
+                    data_dict[wind_num][receive_time] = dict()
+
+                if mesurepoint_num in measurepoint_use_dict.keys():
+                    data_dict[wind_num][receive_time][mesurepoint_num] = v
+
+        datas = list()
+        for wind_num, data in data_dict.items():
+            for receive_time, mesurepoint_data in data.items():
+                data = [wind_num, receive_time]
+                for point_num in measurepoint_use_dict.keys():
+                    data.append(mesurepoint_data[point_num] if point_num in mesurepoint_data.keys() else np.nan)
+                if len(data) > 2:
+                    datas.append(data)
+
+        cols = ['风机编号', '时间']
+        cols.extend(measurepoint_use_dict.values())
+        result_df = pd.DataFrame(data=datas, columns=cols)
+        result_df.sort_values(by=['风机编号', '时间'])
+        create_dir(save_path, True)
+        result_df.to_csv(save_path, encoding='utf8', index=False, compression='gzip')
+        info_print("文件", save_path, '保存成功')
+
+
 if __name__ == '__main__':
+    info_print("开始执行")
     begin = datetime.datetime.now()
-    pretty_print("开始执行")
-    today_year_month = datetime.datetime.now().strftime('%Y%m')
-    today = datetime.datetime.now().strftime('%d')
-    yestoday = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y%m%d')
-    lastday = (datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y%m%d')
-
-    lastday_table = f'{lastday}_measurement'
-    lastday_df = pd.DataFrame()
-    if not exists_table(lastday_table):
-        pretty_print(f"{lastday_table}表不存在")
-    else:
-        lastday_df_sql = f"SELECT * FROM `{lastday_table}` order by  id desc limit 10"
-        pretty_print(lastday_df_sql)
-        lastday_df = pd.read_sql_query(lastday_df_sql, engine)
-
-    yestoday_table = f'{yestoday}_measurement'
-    if not exists_table(yestoday_table):
-        pretty_print(f"{yestoday_table}表不存在")
-        raise Exception(f"{yestoday_table}表不存在")
-    else:
-        yestoday_df_sql = f"SELECT * FROM `{yestoday_table}`"
-        pretty_print(yestoday_df_sql)
-        yestoday_df = pd.read_sql_query(yestoday_df_sql, engine)
+    try:
+        measurepoint_conf_df = get_all_mesurepoint_conf()
+        measurepoint_use_dict = df_value_to_dict(measurepoint_conf_df, 'id', 'name')
+
+        yestoday = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y%m%d')
+
+        measurement_conf_df = get_all_mesurement_conf()
+        tables = list()
+        for id, measurement_wind_field in zip(measurement_conf_df['id'], measurement_conf_df['measurement_wind_field']):
+            tables.append(
+                (f'{yestoday}_{id}', os.path.join(save_dir, measurement_wind_field, yestoday[0:4], yestoday[0:6],
+                                                  yestoday + '.csv.gz')))
+
+        with multiprocessing.Pool(len(tables)) as pool:
+            pool.starmap(get_data_and_save_file, [(t[0], t[1], measurepoint_use_dict) for t in tables])
+    except Exception as e:
+        error_print(traceback.format_exc())
+        raise e
 
+    info_print("执行结束,总耗时:", datetime.datetime.now() - begin)