Browse Source

添加临时任务执行脚本

wzl 5 months ago
parent
commit
217c62ed7b

+ 21 - 0
service/wave_service.py

@@ -63,6 +63,27 @@ def update_wave_trans_transfer_progress(id, transfer_progress=0, save_db=True):
         plt.execute(exec_sql, (int(transfer_progress), id))
 
 
+def create_wave_table(table_name, save_db=True):
+    if save_db:
+        exec_sql = f"""
+        CREATE TABLE `{table_name}` (
+          `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
+          `wind_turbine_number` varchar(20) DEFAULT NULL COMMENT '风机编号',
+          `wind_turbine_name` varchar(20) DEFAULT NULL COMMENT '原始风机编号',
+          `time_stamp` datetime DEFAULT NULL COMMENT '时间',
+          `rotational_speed` float DEFAULT NULL COMMENT '转速',
+          `sampling_frequency` varchar(50) DEFAULT NULL COMMENT '采样频率',
+          `mesure_point_name` varchar(100) DEFAULT NULL COMMENT '测点名称',
+          `mesure_data` mediumtext COMMENT '测点数据',
+          PRIMARY KEY (`id`),
+          KEY `wind_turbine_number` (`wind_turbine_number`),
+          KEY `time_stamp` (`time_stamp`),
+          KEY `mesure_point_name` (`mesure_point_name`)
+        ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4
+        """
+        plt.execute(exec_sql)
+
+
 # 获取执行的数据
 def get_wave_exec_data(run_count: int = 1) -> dict:
     query_running_sql = "select count(1) as count from data_transfer where trans_sys_status = 0"

+ 98 - 0
tmp_file/偏航误差验证.py

@@ -0,0 +1,98 @@
+import os
+import sys
+from concurrent.futures.thread import ThreadPoolExecutor
+
+sys.path.insert(0, os.path.abspath(__file__).split("tmp_file")[0])
+
+import datetime
+import multiprocessing
+
+import pandas as pd
+
+from utils.file.trans_methods import read_files, copy_to_new, read_excel_files, read_file_to_df
+from utils.zip.unzip import get_desc_path, unzip
+
+
+def get_real_path(win_path):
+    return win_path.replace(r'Z:', r'/data/download').replace("\\", '/')
+
+
+def unzip_or_remove(file, tmp_dir):
+    if str(file).endswith("zip"):
+        unzip(file, tmp_dir)
+    else:
+        copy_to_new(file, file.replace(file, tmp_dir))
+
+
+def read_file_to_df_and_select(file_path):
+    select_cols = ['Timestamp', 'Los', 'Distance', 'HWS(hub)', 'HWS(hub)status', 'DIR(hub)', 'DIR(hub)status']
+    df = read_file_to_df(file_path, read_cols=select_cols)
+    condition1 = df['HWS(hub)status'] > 0.8
+    condition2 = df['DIR(hub)status'] > 0.8
+    condition3 = df['Distance'].isin([70, 90])
+
+    df = df[condition1 & condition2 & condition3]
+    return df
+
+
+def read_month_data_and_select(month, files, gonglv_df):
+    with ThreadPoolExecutor(max_workers=10) as executor:
+        dfs = list(executor.map(read_file_to_df_and_select, files))
+
+    df = pd.concat(dfs, ignore_index=True)
+
+    df['Time1'] = df['Timestamp'].apply(lambda x: x.split('.')[0])
+    df['Time1'] = pd.to_datetime(df['Time1'], errors='coerce')
+    df['Time1'] = df['Time1'].apply(
+        lambda x: x + datetime.timedelta(seconds=10 - x.second % 10) if x.second % 10 != 0 else x)
+    del gonglv_df['month']
+    result_df = pd.merge(df, gonglv_df, left_on='Time1', right_on='Time1')
+    result_df.sort_values(by='Time1', inplace=True)
+    save_dir = get_real_path('Z:\偏航误差验证数据\整理结果')
+    # save_dir = r'D:\data\pianhang\result'
+    result_df.to_csv(os.path.join(save_dir, f'{month}.csv'), encoding='utf8', index=False)
+
+
+if __name__ == '__main__':
+    read_dir = 'Z:\偏航误差验证数据\新华佳县雷达数据'
+    read_dir = get_real_path(read_dir)
+
+    tmp_dir = get_real_path(r'Z:\偏航误差验证数据\tmp_data')
+    gonglv_dir = get_real_path(r'Z:\偏航误差验证数据\陕西建工陕西智华\report\output')
+
+    # read_dir = r'D:\data\pianhang\1'
+    # tmp_dir = r'D:\data\pianhang\tmp'
+    # gonglv_dir = r'D:\data\pianhang\2'
+
+    gonglv_files = read_excel_files(gonglv_dir)
+
+    with multiprocessing.Pool(20) as pool:
+        dfs = pool.starmap(read_file_to_df, [(i, ['collect_time', 'a0216']) for i in gonglv_files])
+
+    gonglv_df = pd.concat(dfs, ignore_index=True)
+    gonglv_df.columns = ['Time1', '功率']
+    gonglv_df['Time1'] = pd.to_datetime(gonglv_df['Time1'], errors='coerce')
+    gonglv_df['month'] = gonglv_df['Time1'].dt.month
+
+    all_files = read_files(tmp_dir)
+
+    all_files = [i for i in all_files if str(os.path.basename(i)).startswith('WindSpeed2024')]
+
+    # with multiprocessing.Pool(20) as pool:
+    #     pool.starmap(unzip_or_remove, [(file, tmp_dir) for file in all_files])
+
+    month_map = dict()
+    for file in all_files:
+        base_name = os.path.basename(file)
+        month = base_name[13:15]
+        if month in month_map.keys():
+            month_map[month].append(file)
+        else:
+            month_map[month] = [file]
+
+    excel_files = read_excel_files(tmp_dir)
+
+    with multiprocessing.Pool(5) as pool:
+        pool.starmap(read_month_data_and_select,
+                     [(month, files, gonglv_df[gonglv_df['month'] == int(month)]) for month, files in
+                      month_map.items()])

+ 67 - 0
tmp_file/张崾先统计-分钟.py

@@ -0,0 +1,67 @@
+import multiprocessing
+import os
+import sys
+
+sys.path.insert(0, os.path.abspath(__file__).split("tmp_file")[0])
+
+import pandas as pd
+
+from utils.file.trans_methods import read_file_to_df
+
+
+def save_percent(value, save_decimal=7):
+    return round(value, save_decimal) * 100
+
+
+def read_and_select(file_path, read_cols):
+    result_df = pd.DataFrame()
+    df = read_file_to_df(file_path, read_cols=read_cols)
+    wind_name = os.path.basename(file_path).split('.')[0]
+    df['风机号'] = wind_name
+    df = df.query("(startTime>='2023-10-01 00:00:00') & (startTime<'2024-10-01 00:00:00')")
+    count = 366 * 24 * 6  # 十分钟数据  2024年366天
+    repeat_time_count = df.shape[0] - len(df['startTime'].unique())
+    print(wind_name, count, repeat_time_count)
+    result_df['风机号'] = [wind_name]
+    result_df['重复率'] = [save_percent(repeat_time_count / count)]
+    result_df['重复次数'] = [repeat_time_count]
+    result_df['总记录数'] = [count]
+
+    for read_col in read_cols:
+
+        if read_col != 'startTime':
+            df[read_col] = pd.to_numeric(df[read_col], errors='coerce')
+        else:
+            df[read_col] = pd.to_datetime(df[read_col], errors='coerce')
+
+    group_df = df.groupby(by=['风机号']).count()
+    group_df.reset_index(inplace=True)
+    count_df = pd.DataFrame(group_df)
+    total_count = count_df[read_cols].values[0].sum()
+    print(wind_name, total_count, count * len(read_cols))
+    result_df['平均缺失率,单位%'] = [save_percent(1 - total_count / (count * len(read_cols)))]
+    result_df['缺失数值'] = ['-'.join([str(count - i) for i in count_df[read_cols].values[0]])]
+    del group_df
+
+    error_fengsu_count = df.query("(风速10min < 0) | (风速10min > 80)").shape[0]
+    error_yougong_gonglv = df.query("(有功功率 < -200) | (有功功率 > 4800)").shape[0]
+
+    result_df['平均异常率'] = [save_percent((error_fengsu_count + error_yougong_gonglv) / (2 * count))]
+
+    return result_df
+
+
+if __name__ == '__main__':
+    read_cols_str = 'startTime,有功功率,叶轮转速,发电机转速,风速10min,桨叶1角度,桨叶2角度,桨叶3角度,机舱位置,偏航误差,发电机轴承温度,机舱内温度,环境温度,发电机U相温度,发电机V相温度,发电机W相温度'
+    read_cols = [i for i in read_cols_str.split(",") if i]
+    read_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/收资数据/导出数据2'
+
+    files = os.listdir(read_dir)
+
+    with multiprocessing.Pool(16) as pool:
+        dfs = pool.starmap(read_and_select, [(os.path.join(read_dir, i), read_cols) for i in files])
+
+    df = pd.concat(dfs, ignore_index=True)
+    df.sort_values(by=['风机号'], inplace=True)
+
+    df.to_csv("张崾先统计-分钟.csv", encoding='utf8', index=False)

+ 92 - 0
tmp_file/张崾先统计-秒.py

@@ -0,0 +1,92 @@
+import multiprocessing
+import os
+import sys
+
+sys.path.insert(0, os.path.abspath(__file__).split("tmp_file")[0])
+
+import pandas as pd
+
+from utils.file.trans_methods import read_file_to_df
+
+
+def save_percent(value, save_decimal=7):
+    return round(value, save_decimal) * 100
+
+
+def read_and_select(file_path, read_cols):
+    result_df = pd.DataFrame()
+    df = read_file_to_df(file_path, read_cols=read_cols)
+    wind_name = os.path.basename(file_path).split('.')[0]
+    df['风机号'] = wind_name
+    df = df.query("(Time>='2024-06-01 00:00:00') & (Time<'2024-12-01 00:00:00')")
+    count = 15811200  # 1秒数据  半年
+    repeat_time_count = df.shape[0] - len(df['Time'].unique())
+    print(wind_name, count, repeat_time_count)
+    result_df['风机号'] = [wind_name]
+    result_df['重复率'] = [save_percent(repeat_time_count / count)]
+    result_df['重复次数'] = [repeat_time_count]
+    result_df['总记录数'] = [count]
+
+    for read_col in read_cols:
+
+        if read_col != 'Time':
+            df[read_col] = pd.to_numeric(df[read_col], errors='coerce')
+        else:
+            df[read_col] = pd.to_datetime(df[read_col], errors='coerce')
+
+    group_df = df.groupby(by=['风机号']).count()
+    group_df.reset_index(inplace=True)
+    count_df = pd.DataFrame(group_df)
+    total_count = count_df[read_cols].values[0].sum()
+    print(wind_name, total_count, count * len(read_cols))
+    result_df['平均缺失率,单位%'] = [save_percent(1 - total_count / (count * len(read_cols)))]
+    result_df['缺失数值'] = ['-'.join([str(count - i) for i in count_df[read_cols].values[0]])]
+    del group_df
+
+    fengsu_count = 0
+    fengsu_cols = [i for i in read_cols if '风速' in i]
+    fengsu_str = ''
+    for col in fengsu_cols:
+        now_count = df.query("(" + col + " < 0) | (" + col + " > 80)").shape[0]
+        fengsu_count = fengsu_count + now_count
+        fengsu_str = fengsu_str + ',' + col + ':' + str(fengsu_count)
+    result_df['风速异常'] = [fengsu_str]
+
+    gonglv_cols = ['有功功率', '瞬时功率', '当前理论可发最大功率']
+    gonglv_count = 0
+    gonglv_str = ''
+    for col in gonglv_cols:
+        now_count = df.query("(" + col + " < -200) | (" + col + " > 4800)").shape[0]
+        gonglv_count = gonglv_count + now_count
+        gonglv_str = gonglv_str + ',' + col + ':' + str(gonglv_count)
+    result_df['功率异常'] = [gonglv_str]
+
+    result_df['平均异常率'] = [
+        save_percent((fengsu_count + fengsu_count) / ((len(fengsu_cols) + len(gonglv_cols)) * count))]
+
+    return result_df
+
+
+if __name__ == '__main__':
+    read_cols = ['Time', '设备主要状态', '功率曲线风速', '湍流强度', '实际风速', '有功功率', '桨叶角度A', '桨叶角度B',
+                 '桨叶角度C', '机舱内温度', '机舱外温度', '绝对风向', '机舱绝对位置', '叶轮转速', '发电机转速',
+                 '瞬时风速',
+                 '有功设定反馈', '当前理论可发最大功率', '空气密度', '偏航误差', '发电机扭矩', '瞬时功率', '风向1s',
+                 '偏航压力', '桨叶1速度', '桨叶2速度', '桨叶3速度', '桨叶1角度给定', '桨叶2角度给定', '桨叶3角度给定',
+                 '轴1电机电流', '轴2电机电流', '轴3电机电流', '轴1电机温度', '轴2电机温度', '轴3电机温度', '待机',
+                 '启动',
+                 '偏航', '并网', '限功率', '正常发电', '故障', '计入功率曲线', '运行发电机冷却风扇1',
+                 '运行发电机冷却风扇2',
+                 '激活偏航解缆阀', '激活偏航刹车阀', '激活风轮刹车阀', '激活顺时针偏航', '激活逆时针偏航', '电缆扭角']
+
+    read_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/清理数据/点检表以外测点儿-20241210'
+
+    files = os.listdir(read_dir)
+
+    with multiprocessing.Pool(4) as pool:
+        dfs = pool.starmap(read_and_select, [(os.path.join(read_dir, i), read_cols) for i in files])
+
+    df = pd.concat(dfs, ignore_index=True)
+    df.sort_values(by=['风机号'], inplace=True)
+
+    df.to_csv("张崾先统计-秒.csv", encoding='utf8', index=False)

+ 90 - 0
tmp_file/张崾先震动_参数获取.py

@@ -0,0 +1,90 @@
+import datetime
+import multiprocessing
+import os.path
+
+import pandas as pd
+
+
+def __build_directory_dict(directory_dict, path, filter_types=None):
+    # 遍历目录下的所有项
+    for item in os.listdir(path):
+        item_path = os.path.join(path, item)
+        if os.path.isdir(item_path):
+            __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
+        elif os.path.isfile(item_path):
+            if path not in directory_dict:
+                directory_dict[path] = []
+
+            if filter_types is None or len(filter_types) == 0:
+                directory_dict[path].append(item_path)
+            elif str(item_path).split(".")[-1] in filter_types:
+                if str(item_path).count("~$") == 0:
+                    directory_dict[path].append(item_path)
+
+
+# 读取路径下所有的excel文件
+def read_excel_files(read_path, filter_types=None):
+    if filter_types is None:
+        filter_types = ['xls', 'xlsx', 'csv', 'gz']
+    if os.path.isfile(read_path):
+        return [read_path]
+
+    directory_dict = {}
+    __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
+
+    return [path for paths in directory_dict.values() for path in paths if path]
+
+
+# 读取路径下所有的文件
+def read_files(read_path, filter_types=None):
+    if filter_types is None:
+        filter_types = ['xls', 'xlsx', 'csv', 'gz', 'zip', 'rar']
+    if os.path.isfile(read_path):
+        return [read_path]
+    directory_dict = {}
+    __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
+
+    return [path1 for paths in directory_dict.values() for path1 in paths if path1]
+
+
+all_files = read_files(r'G:\CMS', ['txt'])
+
+
+def get_line_count(file_path):
+    with open(file_path, 'r', encoding='utf-8') as file:
+        return sum(1 for _ in file)
+
+
+def read_file_and_read_count(index, file_path, datas):
+    if index % 10000 == 0:
+        print(datetime.datetime.now(), index)
+    base_name = os.path.basename(file_path).split('.')[0]
+    cols = base_name.split('_')
+
+    cols.append(get_line_count(file_path))
+    datas.append(cols)
+
+
+def get_name(x):
+    result_str = ''
+    if x['col3'] != '无':
+        result_str += x['col3']
+    result_str += x['col2']
+    if x['col4'] != '无':
+        result_str += x['col4']
+    result_str += x['col7']
+    return result_str
+
+
+if __name__ == '__main__':
+    datas = multiprocessing.Manager().list()
+
+    with multiprocessing.Pool(20) as pool:
+        pool.starmap(read_file_and_read_count, [(i, file_path, datas) for i, file_path in enumerate(all_files)])
+
+    df = pd.DataFrame(datas, columns=[f'col{i}' for i in range(10)])
+
+    df['col8'] = pd.to_datetime(df['col8'], format='%Y%m%d%H%M%S', errors='coerce')
+    df.sort_values(by=['col1', 'col8'], inplace=True)
+    df['测点完整名称'] = df.apply(get_name, axis=1)
+    df.to_csv('d://cms_data.csv', index=False, encoding='utf8')

+ 90 - 0
tmp_file/王博提取数据完整风机数据.py

@@ -0,0 +1,90 @@
+import datetime
+import multiprocessing
+import os
+
+import chardet
+import pandas as pd
+
+
+def detect_file_encoding(filename):
+    # 读取文件的前1000个字节(足够用于大多数编码检测)
+    with open(filename, 'rb') as f:
+        rawdata = f.read(1000)
+    result = chardet.detect(rawdata)
+    encoding = result['encoding']
+
+    print("文件类型:", filename, encoding)
+
+    if encoding is None:
+        encoding = 'gb18030'
+
+    if encoding.lower() in ['utf-8', 'ascii', 'utf8', 'utf-8-sig']:
+        return 'utf-8'
+
+    return 'gb18030'
+
+
+def __build_directory_dict(directory_dict, path, filter_types=None):
+    # 遍历目录下的所有项
+    for item in os.listdir(path):
+        item_path = os.path.join(path, item)
+        if os.path.isdir(item_path):
+            __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
+        elif os.path.isfile(item_path):
+            if path not in directory_dict:
+                directory_dict[path] = []
+
+            if filter_types is None or len(filter_types) == 0:
+                directory_dict[path].append(item_path)
+            elif str(item_path).split(".")[-1] in filter_types:
+                if str(item_path).count("~$") == 0:
+                    directory_dict[path].append(item_path)
+
+
+# 读取路径下所有的excel文件
+def read_excel_files(read_path, filter_types=None):
+    if filter_types is None:
+        filter_types = ['xls', 'xlsx', 'csv', 'gz']
+    if os.path.isfile(read_path):
+        return [read_path]
+
+    directory_dict = {}
+    __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
+
+    return [path for paths in directory_dict.values() for path in paths if path]
+
+
+def read_file_to_df(file_path):
+    df = pd.read_csv(file_path, encoding=detect_file_encoding(file_path))
+    date = os.path.basename(file_path)[14:24]
+    df['Time'] = df['Time'].apply(lambda x: date + ' ' + x)
+    return df
+
+
+def read_files_and_save_csv(file_dir, month, save_dir):
+    begin = datetime.datetime.now()
+    base_dir = os.path.basename(file_dir)
+    print(f"{datetime.datetime.now()}: 开始执行{base_dir}-{month}")
+    all_files = read_excel_files(os.path.join(file_dir, month))
+    df = pd.concat([read_file_to_df(file) for file in all_files], ignore_index=True)
+    save_path = os.path.join(save_dir, base_dir, f'{month}.csv')
+    os.makedirs(os.path.dirname(save_path), exist_ok=True)
+    df.sort_values(by=['Time'], inplace=True)
+    df.to_csv(save_path, encoding='utf8', index=False)
+    print(f"{datetime.datetime.now()}: 执行{base_dir}-{month}结束,耗时{datetime.datetime.now() - begin}")
+
+
+if __name__ == '__main__':
+    begin = datetime.datetime.now()
+    read_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/tmp/second/excel_tmp/'
+    save_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/清理数据/20241217完整字段'
+    read_dirs = list()
+    for i in range(26, 42):
+        read_dirs.append(os.path.join(read_dir, str(i)))
+
+    for read_dir in read_dirs:
+        begin = datetime.datetime.now()
+        with multiprocessing.Pool(6) as pool:
+            pool.starmap(read_files_and_save_csv, [(read_dir, i, save_dir) for i in os.listdir(read_dir)])
+
+    print(f"{datetime.datetime.now()}: 执行结束,总耗时{datetime.datetime.now() - begin}")

+ 29 - 0
tmp_file/筛选字段.py

@@ -0,0 +1,29 @@
+import multiprocessing
+import os
+import sys
+
+sys.path.insert(0, os.path.abspath(__file__).split("tmp_file")[0])
+
+from utils.file.trans_methods import read_file_to_df, create_file_path
+
+
+def read_and_save(file_path, select_cols, save_path):
+    base_name = os.path.basename(file_path).split('.')[0]
+    df = read_file_to_df(file_path, read_cols=select_cols)
+
+    save_path = os.path.join(save_path, base_name + '.csv')
+    create_file_path(save_path, True)
+    df.to_csv(save_path, index=False, encoding='utf-8')
+
+
+if __name__ == '__main__':
+    select_cols_str = 'Time,瞬时风速,风机号,瞬时功率,扭矩给定,扭矩反馈,高风切出,风机允许功率管理,功率管理使能反馈,不可利用,功率曲线可用,主控初始化完成,待机,启动,偏航,并网,限功率,正常发电,故障,紧急停机,快速停机,正常停机,告警,停机完成,允许功率管理,处于功率管理,检修,维护'
+
+    select_cols = [i for i in select_cols_str.split(',') if i]
+
+    read_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/清理数据/20241213(26,38)完整字段/26'
+
+    save_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/清理数据/20241213(26,38)完整字段/20241216113130'
+
+    with multiprocessing.Pool(6) as pool:
+        pool.starmap(read_and_save, [(os.path.join(read_dir, i), select_cols, save_dir) for i in os.listdir(read_dir)])