Prechádzať zdrojové kódy

修改不引用os报错的问题

wzl 6 mesiacov pred
rodič
commit
f1ba1ac32a

+ 7 - 8
service/trans_service.py

@@ -16,8 +16,7 @@ trans = ConnectMysql("trans")
 def get_min_sec_conf(field_code, trans_type) -> dict:
     query_sql = "SELECT * FROM trans_conf where wind_code = %s and type = %s and status = 1"
     res = trans.execute(query_sql, (field_code, trans_type))
-    print(res)
-    if type(res) == tuple:
+    if type(res) == tuple or type(res) == str:
         return None
     return res[0]
 
@@ -26,7 +25,7 @@ def get_min_sec_conf_test(field_code, trans_type) -> dict:
     query_sql = "SELECT * FROM trans_conf where wind_name = %s and type = %s and status = 1"
     res = trans.execute(query_sql, (field_code, trans_type))
     print(res)
-    if type(res) == tuple:
+    if type(res) == tuple or type(res) == str:
         return None
     return res[0]
 
@@ -46,7 +45,7 @@ def get_fault_warn_conf(field_code, trans_type) -> dict:
     query_sql = "SELECT * FROM warn_fault_conf where wind_code = %s and type in %s and status = 1"
     res = trans.execute(query_sql, (field_code, types))
     print(res)
-    if type(res) == tuple:
+    if type(res) == tuple or type(res) == str:
         return None
     return res[0]
 
@@ -55,7 +54,7 @@ def get_wave_conf(field_code) -> dict:
     query_sql = "SELECT * FROM wave_conf where wind_code = %s and status = 1"
     res = trans.execute(query_sql, (field_code))
     print(res)
-    if type(res) == tuple:
+    if type(res) == tuple or type(res) == str:
         return None
     return res[0]
 
@@ -264,10 +263,10 @@ def delete_exist_wave_data(table_name, ids):
         trans.execute(delete_sql, array)
 
 
-def get_trans_exec_code(batch_no, type):
-    query_sql = f"SELECT * from batch_exec_code t where t.batch_no = '{batch_no}' and type='{type}' and t.`status` = 1 limit 1"
+def get_trans_exec_code(batch_no, query_type):
+    query_sql = f"SELECT * from batch_exec_code t where t.batch_no = '{batch_no}' and type='{query_type}' and t.`status` = 1 limit 1"
     res = trans.execute(query_sql)
-    if type(res) == tuple:
+    if type(res) == tuple or type(res) == str:
         return None
     exec_code = res[0]['exec_code']
     trans_print("批次", batch_no, '类型', type, '获取到执行代码:', exec_code)

+ 22 - 0
tmp_file/列名包含数据处理.py

@@ -0,0 +1,22 @@
+import pandas as pd
+
+path = r'd://data//11.csv'
+
+df = pd.read_csv(path, encoding='gb18030')
+df.reset_index(inplace=True)
+print(df.columns)
+df.columns = [i.replace('()', '') for i in df.columns]
+wind_names = set([i.split('#-')[0] for i in df.columns if i.find('#-') > -1])
+print(df.columns)
+print(wind_names)
+for wind_name in wind_names:
+    select_cols = [i for i in df.columns if str(i).startswith(wind_name)]
+    print(select_cols)
+    select_cols.insert(0, '时间')
+    print(select_cols)
+    df_temp = df[select_cols]
+    df_temp.columns = [i.split('#-')[-1] for i in df_temp.columns]
+
+    df_temp.sort_values(by='时间', inplace=True)
+
+    df_temp.to_csv("d://data//najiade//"+str(wind_name) + '.csv', encoding='utf8', index=False)

+ 40 - 0
tmp_file/吉山批次处理并重新存数据库.py

@@ -0,0 +1,40 @@
+import datetime
+import multiprocessing
+import sys
+from os import *
+
+sys.path.insert(0, path.abspath(__file__).split("tmp_file")[0])
+
+from service.trans_service import save_df_to_db, drop_table, creat_min_sec_table
+from utils.file.trans_methods import read_file_to_df, read_files
+
+
+def read_and_exec(file_path):
+    begin = datetime.datetime.now()
+    print("开始执行:", path.basename(file_path))
+    df = read_file_to_df(file_path)
+    df['yaw_error1'] = df['true_wind_direction'] - 180
+    df.to_csv(file_path, index=False, encoding='utf8')
+    creat_min_sec_table()
+    save_df_to_db('WOF079200018-WOB000012_second', df)
+    print("结束执行:", path.basename(file_path), ",耗时:", datetime.datetime.now() - begin)
+
+
+if __name__ == '__main__':
+    begin = datetime.datetime.now()
+    env = 'prod'
+    if len(sys.argv) >= 2:
+        env = sys.argv[1]
+
+    conf_path = path.abspath(f"./conf/etl_config_{env}.yaml")
+    environ['ETL_CONF'] = conf_path
+    environ['env'] = env
+
+    drop_table("WOF079200018-WOB000012_second")
+
+    read_dir = r'/data/download/collection_data/2完成/吉山风电场-江西-大唐/清理数据/WOF079200018-WOB000012_JS一期1秒24.8-10/second'
+
+    all_files = read_files(read_dir)
+    with multiprocessing.Pool(24) as pool:
+        pool.map(read_and_exec, all_files)
+    print("总耗时:", datetime.datetime.now() - begin)

+ 49 - 0
utils/draw/draw_file.py

@@ -30,3 +30,52 @@ def scatter(title, x_label, y_label, x_values, y_values, color=None, col_map=dic
 
     plt.savefig(save_file_path)
     plt.close()
+
+
+if __name__ == '__main__':
+    import pandas as pd
+    import numpy as np
+    from matplotlib import pyplot as plt
+
+    df = pd.read_csv(r"/home/wzl/test_data/2024_10_17_14_54_46_200k_Root.csv")
+    df.reset_index(inplace=True, drop=True)
+    df.columns = ['data']
+
+    # Calculate the moving average with a window of 3 (1 before, 1 after)
+    window_size = 20
+    moving_avg = df['data'].rolling(window=window_size).mean()
+    df['moving_avg'] = moving_avg
+    # Calculate the percentage difference
+    percentage_diff = abs((df['data'] - moving_avg) / moving_avg) * 100
+    df['percentage_diff'] = percentage_diff
+    # Flag values that differ by more than threshold
+    threshold = 3
+    df['is_anomaly'] = percentage_diff < threshold
+
+    avg = df['data'].mean()
+    df['avg']=df['data'] > avg
+
+
+    difference_ratio = df.iloc[window_size:]
+    difference_ratio.reset_index(inplace=True)
+    # 创建图形和轴对象
+    plt.figure(figsize=(10, 6))
+    colors = np.where((difference_ratio['is_anomaly'] == True) & (difference_ratio['avg'] == True), 'r', np.where((difference_ratio['is_anomaly'] == False) & (difference_ratio['avg'] == False), 'g', 'b'))
+
+    datas = difference_ratio['data'].values
+    # for i in range(len(datas)):
+    #     plt.plot(i, datas[i], marker='o', color=colors[i])
+
+    plt.figure(figsize=(10, 6))
+    plt.scatter([i for i in range(len(datas))], datas,  c=colors)
+
+    # 添加标题和标签
+    plt.title('Difference Ratio of Each Data Point to Its Previous 10 Data Points Mean')
+    plt.xlabel('Index')
+    plt.ylabel('Difference Ratio')
+
+    # 显示网格
+    plt.grid(True)
+
+    # 显示图形
+    plt.show()

+ 16 - 16
utils/file/trans_methods.py

@@ -3,9 +3,9 @@
 # @Author  : 魏志亮
 import ast
 import datetime
+import os
 import shutil
 import warnings
-from os import *
 
 import chardet
 import pandas as pd
@@ -28,7 +28,7 @@ def detect_file_encoding(filename):
     if encoding is None:
         encoding = 'gb18030'
 
-    if encoding.lower() in ['utf-8', 'ascii', 'utf8']:
+    if encoding.lower() in ['utf-8', 'ascii', 'utf8', 'utf-8-sig']:
         return 'utf-8'
 
     return 'gb18030'
@@ -89,7 +89,7 @@ def read_file_to_df(file_path, read_cols=list(), trans_cols=None, nrows=None, no
     find_cols = list()
     if trans_cols:
         header = find_read_header(file_path, trans_cols, resolve_col_prefix)
-        trans_print(path.basename(file_path), "读取第", header, "行")
+        trans_print(os.path.basename(file_path), "读取第", header, "行")
         if header is None:
             if not_find_header == 'raise':
                 message = '未匹配到开始行,请检查并重新指定'
@@ -137,7 +137,7 @@ def read_file_to_df(file_path, read_cols=list(), trans_cols=None, nrows=None, no
             trans_print('文件读取成功:', file_path, '数据数量:', df.shape, '耗时:', datetime.datetime.now() - begin)
         except Exception as e:
             trans_print('读取文件出错', file_path, str(e))
-            message = '文件:' + path.basename(file_path) + ',' + str(e)
+            message = '文件:' + os.path.basename(file_path) + ',' + str(e)
             raise ValueError(message)
 
     return df
@@ -145,11 +145,11 @@ def read_file_to_df(file_path, read_cols=list(), trans_cols=None, nrows=None, no
 
 def __build_directory_dict(directory_dict, path, filter_types=None):
     # 遍历目录下的所有项
-    for item in listdir(path):
-        item_path = path.join(path, item)
-        if path.isdir(item_path):
+    for item in os.listdir(path):
+        item_path = os.path.join(path, item)
+        if os.path.isdir(item_path):
             __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
-        elif path.isfile(item_path):
+        elif os.path.isfile(item_path):
             if path not in directory_dict:
                 directory_dict[path] = []
 
@@ -164,7 +164,7 @@ def __build_directory_dict(directory_dict, path, filter_types=None):
 def read_excel_files(read_path, filter_types=None):
     if filter_types is None:
         filter_types = ['xls', 'xlsx', 'csv', 'gz']
-    if path.isfile(read_path):
+    if os.path.isfile(read_path):
         return [read_path]
 
     directory_dict = {}
@@ -177,12 +177,12 @@ def read_excel_files(read_path, filter_types=None):
 def read_files(read_path, filter_types=None):
     if filter_types is None:
         filter_types = ['xls', 'xlsx', 'csv', 'gz', 'zip', 'rar']
-    if path.isfile(read_path):
+    if os.path.isfile(read_path):
         return [read_path]
     directory_dict = {}
     __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
 
-    return [path for paths in directory_dict.values() for path in paths if path]
+    return [path1 for paths in directory_dict.values() for path1 in paths if path1]
 
 
 def copy_to_new(from_path, to_path):
@@ -196,17 +196,17 @@ def copy_to_new(from_path, to_path):
 
 
 # 创建路径
-def create_file_path(path, is_file_path=False):
+def create_file_path(read_path, is_file_path=False):
     """
     创建路径
-    :param path:创建文件夹的路径
+    :param read_path:创建文件夹的路径
     :param is_file_path: 传入的path是否包含具体的文件名
     """
     if is_file_path:
-        path = path.dirname(path)
+        read_path = os.path.dirname(read_path)
 
-    if not path.exists(path):
-        makedirs(path, exist_ok=True)
+    if not os.path.exists(read_path):
+        os.makedirs(read_path, exist_ok=True)
 
 
 def valid_eval(eval_str):

+ 0 - 0
wind_farm/__init__.py


+ 0 - 0
wind_farm/中广核/__init__.py


+ 166 - 0
wind_farm/中广核/second_data.py

@@ -0,0 +1,166 @@
+import pandas as pd
+from sqlalchemy import create_engine
+import datetime
+
+engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/appoint')
+mesurepoint_conf = {
+    0: '网侧L1相功率因数',
+    1: '网侧L1相电流',
+    2: '网侧L2相电流',
+    3: '网侧L3相电流',
+    4: '变频器水冷入口压力',
+    5: '变频器水冷出口压力',
+    6: '变频器水冷入口温度',
+    7: '变频器水冷出口温度',
+    8: '电网频率',
+    9: '网侧L1相电压',
+    10: '网侧L2相电压',
+    11: '网侧L3相电压',
+    12: '外测有功功率',
+    13: '发电机进风口前部温度',
+    14: '发电机压差传感器',
+    15: '发电机进风口后部温度',
+    16: '发电机出风口温度',
+    17: '风机有功功率',
+    18: '发电机水冷入口压力',
+    19: '发电机水冷出口压力',
+    20: '发电机无功功率',
+    21: '发电机转速',
+    22: '发电机水冷入口温度',
+    23: '发电机水冷出口温度',
+    24: '发电机驱动端轴承温度',
+    25: '发电机非驱动端轴承温度',
+    26: '发电机滑环室温度',
+    27: '发电机定子U相线圈温度',
+    28: '发电机定子V相线圈温度',
+    29: '发电机定子W相线圈温度',
+    30: '机舱水冷齿轮箱冷却液温度1min',
+    31: '机舱水冷发电机冷却液温度1min',
+    32: '机舱冷却系统水泵入口压力',
+    33: '机舱水冷泵出口压力1min',
+    34: '塔底水冷变频器入口水温',
+    35: '塔底水冷变频器出口水温',
+    36: '塔底湿度',
+    37: '塔底水冷三通阀设置实际值',
+    38: '塔底水冷三通阀反馈值',
+    39: '塔外湿度',
+    40: '塔底水冷泵入口压力',
+    41: '塔底水冷泵出口压力',
+    42: '舱内温度',
+    43: '机舱控制柜温度',
+    44: '舱外温度',
+    45: '塔底温度',
+    46: '塔底控制柜温度',
+    47: '风向',
+    48: '风速',
+    49: '机舱中轴线与风向夹角',
+    50: '变桨电池1电压',
+    51: '变桨电池2电压',
+    52: '变桨电池3电压',
+    53: '变桨电机1电流',
+    54: '变桨电机2电流',
+    55: '变桨电机3电流',
+    56: '1#桨电机温度',
+    57: '2#桨电机温度',
+    58: '3#桨电机温度',
+    59: '变桨驱动器1温度',
+    60: '变桨驱动器2温度',
+    61: '变桨驱动器3温度',
+    62: '轮毂内温度',
+    63: '变桨电池柜1温度',
+    64: '变桨电池柜2温度',
+    65: '变桨电池柜3温度',
+    66: '动力电缆温度监控1',
+    67: '动力电缆温度监控2',
+    68: '辅变绕组温度',
+    69: '主变绕组温度',
+    70: '齿轮箱入口油压',
+    71: '齿轮箱油路滤网前油压',
+    72: '液压站预充压力',
+    73: '主轴转速',
+    74: '齿轮箱油路入口温度',
+    75: '齿轮箱散热器出口温度',
+    76: '齿轮箱高速轴HSS_GSGS端轴承温度',
+    77: '齿轮箱高速轴HSS_GSRS端轴承温度',
+    78: '齿轮箱中速轴HSS_RS端轴承温度',
+    79: '齿轮箱中速轴IMS_GSRS端轴承温度',
+    80: '齿轮箱高速轴驱动端轴承温度',
+    81: '齿轮箱高速轴非驱动端轴承温度',
+    82: '齿轮箱油池温度',
+    83: '齿轮箱中速轴驱动端轴承温度',
+    84: '齿轮箱中速轴非驱动端轴承温度',
+    85: '主轴承温度',
+    86: '主轴承内圈温度',
+    87: '主轴承外圈温度',
+    88: '主控有功设置值',
+    89: '主控无功设置值',
+    90: '主控次要SC',
+    91: '主控有功限值上限',
+    92: '对外AI型状态1',
+    93: '机舱侧向振动(已滤波)',
+    94: '机舱轴向振动(已滤波)',
+    95: '后方摩擦片距离',
+    96: '前方摩擦片距离',
+    97: '机舱角度',
+    98: '机舱电流',
+    99: '扭揽角度',
+    100: '偏航电机转速1',
+    101: '偏航电机转速2',
+    102: '偏航角度'
+}
+
+common_address_conf = {
+    1: '风场1',
+    2: '风场2',
+    3: '风场3',
+    4: '风场4',
+    5: '风场5',
+    6: '风场6',
+    7: '风场7',
+    8: '风场8',
+    9: '风场9',
+    10: '风场10',
+    11: '风场11',
+    12: '风场12'
+}
+
+
+def pretty_print(message):
+    print(datetime.datetime.now(), ":", message)
+
+
+def exists_table(table_name):
+    sql = f"SELECT * FROM information_schema.tables WHERE table_schema = 'appoint' AND table_name = '{table_name}'"
+    pretty_print(sql)
+    table_df = pd.read_sql_query(sql, engine)
+    if table_df.empty:
+        return False
+    return True
+
+
+if __name__ == '__main__':
+    begin = datetime.datetime.now()
+    pretty_print("开始执行")
+    today_year_month = datetime.datetime.now().strftime('%Y%m')
+    today = datetime.datetime.now().strftime('%d')
+    yestoday = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y%m%d')
+    lastday = (datetime.datetime.now() - datetime.timedelta(days=2)).strftime('%Y%m%d')
+
+    lastday_table = f'{lastday}_measurement'
+    lastday_df = pd.DataFrame()
+    if not exists_table(lastday_table):
+        pretty_print(f"{lastday_table}表不存在")
+    else:
+        lastday_df_sql = f"SELECT * FROM `{lastday_table}` order by  id desc limit 10"
+        pretty_print(lastday_df_sql)
+        lastday_df = pd.read_sql_query(lastday_df_sql, engine)
+
+    yestoday_table = f'{yestoday}_measurement'
+    if not exists_table(yestoday_table):
+        pretty_print(f"{yestoday_table}表不存在")
+        raise Exception(f"{yestoday_table}表不存在")
+    else:
+        yestoday_df_sql = f"SELECT * FROM `{yestoday_table}`"
+        pretty_print(yestoday_df_sql)
+        yestoday_df = pd.read_sql_query(yestoday_df_sql, engine)
+