Просмотр исходного кода

添加部分临时处理文件

wzl 5 месяцев назад
Родитель
Сommit
426ed5a17e

+ 54 - 0
etl/wind_power/laser/LaserTrans.py

@@ -0,0 +1,54 @@
+import datetime
+import multiprocessing
+import os.path
+
+import pandas as pd
+
+from service.plt_service import get_all_wind
+from service.trans_service import save_df_to_db
+from utils.file.trans_methods import read_files, read_file_to_df
+from utils.log.trans_log import set_trance_id
+
+
+class LaserTrans():
+    """
+    激光测距仪转化
+    """
+
+    def __init__(self, field_code, read_path, save_path: str):
+        self.field_code = field_code
+        self.read_path = read_path
+        self.save_path = save_path
+        self.begin = datetime.datetime.now()
+        self.wind_col_trans, _ = get_all_wind(self.field_code, need_rated_param=False)
+
+    def get_file_data(self, file_path):
+        file_name = os.path.basename(file_path)
+        wind_farm, wind_turbine_number, acquisition_time, sampling_frequency = file_name.split("_")
+        result_df = pd.DataFrame()
+        result_df['wind_turbine_number'] = wind_turbine_number
+        result_df['acquisition_time'] = pd.to_datetime(acquisition_time, format='%Y%m%d%H%M%S')
+        result_df['sampling_frequency'] = sampling_frequency
+        result_df['wind_turbine_number'] = result_df['wind_turbine_number'].map(self.wind_col_trans).fillna(
+            result_df['wind_turbine_number'])
+        # 获取数据
+        df = read_file_to_df(file_path)
+        result_df['pk_no'] = df['PkNo'].values[0]
+        result_df['echo_type'] = df['EchoType'].values[0]
+        result_df['echo1_dist'] = df['Echo1Dist'].values
+        result_df['echo1_grey'] = df['Echo1Grey'].values
+        result_df['echo2_dist'] = df['Echo2Dist'].values
+        result_df['echo2_grey'] = df['Echo2Grey'].values
+        result_df['echo3_dist'] = df['Echo3Dist'].values
+        result_df['echo3_grey'] = df['Echo3Grey'].values
+
+        save_df_to_db(self.field_code + "_laser", result_df)
+
+    def run(self):
+        trance_id = '-'.join([self.field_code, 'laser'])
+        set_trance_id(trance_id)
+        all_files = read_files(self.read_path, ['csv'])
+        pool_count = 8 if len(all_files) > 8 else len(all_files)
+
+        with multiprocessing.Pool(pool_count) as pool:
+            pool.map(self.get_file_data, all_files)

+ 0 - 0
etl/wind_power/laser/__init__.py


+ 5 - 0
service/common_connect.py

@@ -0,0 +1,5 @@
+from utils.db.ConnectMysql import ConnectMysql
+
+plt = ConnectMysql("plt")
+
+trans = ConnectMysql("trans")

+ 90 - 0
service/wave_service.py

@@ -0,0 +1,90 @@
+import datetime
+
+from service.common_connect import plt
+
+
+def update_timeout_wave_trans_data():
+    sql = """
+    UPDATE wave_data_transfer  
+    SET trans_sys_status = 2,err_info='运行超时失败',transfer_state=2
+    WHERE  TIMESTAMPDIFF(HOUR, transfer_start_time, NOW()) > 6  
+        AND trans_sys_status = 0
+    """
+    plt.execute(sql)
+
+
+def update_wave_trans_status_running(id, schedule_exec=True):
+    if schedule_exec:
+        exec_sql = """
+        update wave_data_transfer set transfer_state = 0,trans_sys_status = 0 ,transfer_start_time = now(),err_info='',
+        engine_count =0,time_granularity=0,transfer_finish_time=null,
+        data_min_time= null,data_max_time= null,transfer_data_count=null
+        where id = %s 
+        """
+        plt.execute(exec_sql, id)
+
+
+def update_wave_trans_status_error(id, message="", save_db=True):
+    if save_db:
+        exec_sql = """
+        update wave_data_transfer set transfer_state = 2,trans_sys_status=2 ,err_info= %s,transfer_finish_time=now() 
+        where id = %s  
+        """
+
+        message = message if len(message) <= 200 else message[0:200]
+        plt.execute(exec_sql, (message, id))
+
+
+def update_wave_trans_status_success(id, wind_count=0, time_granularity=0,
+                                     min_date=datetime.datetime.now(),
+                                     max_date=datetime.datetime.now(),
+                                     total_count=0, save_db=True):
+    if save_db:
+        if min_date is not None:
+            exec_sql = """
+            update wave_data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress=100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now(),
+            data_min_time= %s,data_max_time= %s,transfer_data_count=%s
+            where id = %s  
+            """
+            plt.execute(exec_sql, (wind_count, time_granularity, min_date, max_date, total_count, id))
+        else:
+            exec_sql = """
+            update wave_data_transfer set transfer_state = 1,trans_sys_status = 1,transfer_progress = 100,err_info = '',engine_count =%s,time_granularity=%s,transfer_finish_time=now()
+            where id = %s  
+            """
+            plt.execute(exec_sql, (wind_count, time_granularity, id))
+
+
+def update_wave_trans_transfer_progress(id, transfer_progress=0, save_db=True):
+    if save_db:
+        exec_sql = """
+        update wave_data_transfer set transfer_progress = %s where id = %s
+        """
+        plt.execute(exec_sql, (int(transfer_progress), id))
+
+
+# 获取执行的数据
+def get_wave_exec_data(run_count: int = 1) -> dict:
+    query_running_sql = "select count(1) as count from data_transfer where trans_sys_status = 0"
+    query_next_exec_sql = """
+    SELECT
+        t.*,a.field_name,b.batch_name
+    FROM
+        wave_data_transfer t INNER JOIN wind_field a on t.field_code = a.field_code
+        inner join wind_field_batch b on t.batch_code = b.batch_code
+    WHERE
+         t.trans_sys_status in (-1,1,2) and t.transfer_state = 0
+    AND t.transfer_addr != ''
+    ORDER BY
+        t.update_time
+    LIMIT 1
+    """
+    data = plt.execute(query_running_sql)
+    now_count = int(data[0]['count'])
+    if now_count >= run_count:
+        return None
+    else:
+        data = plt.execute(query_next_exec_sql)
+        if type(data) == tuple:
+            return {}
+        return data[0]

+ 47 - 0
tmp_file/curge_read.py

@@ -0,0 +1,47 @@
+import os
+
+import chardet
+import pandas as pd
+
+
+# 获取文件编码
+def detect_file_encoding(filename):
+    # 读取文件的前1000个字节(足够用于大多数编码检测)
+    with open(filename, 'rb') as f:
+        rawdata = f.read(1000)
+    result = chardet.detect(rawdata)
+    encoding = result['encoding']
+
+    if encoding is None:
+        encoding = 'gb18030'
+
+    if encoding.lower() in ['utf-8', 'ascii', 'utf8', 'utf-8-sig']:
+        return 'utf-8'
+
+    return 'gb18030'
+
+
+def read_file_to_df(file_path, nrows=None):
+    df = pd.DataFrame()
+    try:
+        if str(file_path).lower().endswith("csv"):
+            encoding = detect_file_encoding(file_path)
+            df = pd.read_csv(file_path, encoding=encoding, on_bad_lines='warn', nrows=nrows)
+        else:
+            xls = pd.ExcelFile(file_path)
+            sheet_names = xls.sheet_names
+            for sheet_name in sheet_names:
+                now_df = pd.read_excel(xls, sheet_name=sheet_name, nrows=nrows)
+                now_df['sheet_name'] = sheet_name
+                df = pd.concat([df, now_df])
+            xls.close()
+    except Exception as e:
+        message = '文件:' + os.path.basename(file_path) + ',' + str(e)
+        raise ValueError(message)
+
+    return df
+
+
+if __name__ == '__main__':
+    df = read_file_to_df(r"D:\data\11-12月.xls")
+    print(df)

+ 31 - 0
tmp_file/张崾先26故障.py

@@ -0,0 +1,31 @@
+import pandas as pd
+
+df = pd.read_csv(r'C:\Users\Administrator\Documents\WeChat Files\anmox-\FileStorage\File\2024-12\26故障.csv',
+                 encoding='gbk')
+df['开始时间'] = pd.to_datetime(df['开始时间'], errors='coerce')
+df['结束时间'] = pd.to_datetime(df['结束时间'], errors='coerce')
+time_df = pd.DataFrame(df.groupby(['开始时间'])['结束时间'].max())
+time_df.reset_index(inplace=True)
+time_df.sort_values(by='开始时间', inplace=True)
+
+datas = set()
+max_row = None
+for index, row in time_df.iterrows():
+    if index == 0:
+        datas.add((row['开始时间'], row['结束时间']))
+        max_row = row
+        continue
+
+    if row['结束时间'] > max_row['结束时间']:
+        datas.add((row['开始时间'], row['结束时间']))
+        max_row = row
+
+result_df = pd.DataFrame()
+for begin, end in datas:
+    print(begin, end)
+    now_df = df[(df['开始时间'] == begin) & (df['结束时间'] == end)]
+    now_df = now_df.tail(1)
+    result_df = pd.concat([result_df, now_df])
+
+result_df.sort_values(by='开始时间', inplace=True)
+result_df.to_csv(r'd:\data\26故障_new.csv', encoding='utf8', index=False)

+ 46 - 0
tmp_file/张崾先筛选20241210.py

@@ -0,0 +1,46 @@
+import datetime
+import multiprocessing
+import os
+import sys
+
+import pandas as pd
+
+sys.path.insert(0, os.path.abspath(__file__).split("tmp_file")[0])
+
+from utils.file.trans_methods import read_file_to_df, read_excel_files
+
+# read_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/清理数据/点检表以外测点儿-20241209'
+
+# save_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/清理数据/变桨-20241210'
+
+# user_cols = ['Time', '机舱外温度', '桨叶角度A', '桨叶角度B', '桨叶角度C',
+#              '轴1电机电流', '轴2电机电流', '轴3电机电流',
+#              '轴1电机温度', '轴2电机温度', '轴3电机温度']
+
+
+read_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/清理数据/点检表以外测点儿-20241210'
+
+save_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/清理数据/偏航-20241210'
+
+user_cols = ['Time', '实际风速', '偏航误差', '电缆扭角', '激活偏航解缆阀','激活顺时针偏航','激活逆时针偏航']
+
+os.makedirs(save_dir, exist_ok=True)
+
+
+def read_and_save(file_path, read_dir, save_dir):
+    begin = datetime.datetime.now()
+    df = read_file_to_df(file_path, read_cols=user_cols)
+    df['Time'] = pd.to_datetime(df['Time'], errors='coerce')
+    df.sort_values(by=['Time'], inplace=True)
+    df.to_csv(os.path.join(save_dir, os.path.basename(file_path)), index=False, encoding='utf8')
+    print(os.path.basename(file_path), '耗时:', (datetime.datetime.now() - begin))
+
+
+if __name__ == '__main__':
+    begin = datetime.datetime.now()
+    all_files = read_excel_files(read_dir)
+
+    with multiprocessing.Pool(16) as pool:
+        pool.starmap(read_and_save, [(file, read_dir, save_dir) for file in all_files])
+
+    print('总耗时:', (datetime.datetime.now() - begin))

+ 32 - 0
tmp_file/张崾先风电场-故障整理.py

@@ -0,0 +1,32 @@
+import multiprocessing
+import os
+
+import pandas as pd
+
+read_dir = 'D:\data\张崾先风电场\故障事件数据'
+save_dir = 'D:\data\崾先风电场\故障事件数据整理'
+
+print(os.listdir(read_dir))
+
+
+def read_solve_data(file_dir):
+    base_dir = os.path.basename(file_dir)
+    df = pd.DataFrame()
+    for file in os.listdir(file_dir):
+        df = pd.concat([df, pd.read_csv(file_dir + '/' + file, encoding='gbk')])
+
+    df['开始时间'] = pd.to_datetime(df['开始时间'], errors='coerce')
+    df = df.query("(开始时间 >= '2024-01-01 00:00:00') & (开始时间 < '2024-12-01 00:00:00')")
+    df['month'] = df['开始时间'].dt.month
+    months = df['month'].unique()
+    for month in months:
+        df_month = df[df['month'] == month]
+        os.makedirs(save_dir + os.sep + base_dir, exist_ok=True)
+        df_month.to_csv(save_dir + os.sep + base_dir + os.sep + str(month) + '.csv', index=False)
+
+
+if __name__ == '__main__':
+    dirs = os.listdir(read_dir)
+
+    with multiprocessing.Pool(4) as pool:
+        pool.map(read_solve_data, [read_dir + os.sep + i for i in dirs])

+ 108 - 0
tmp_file/张崾先风电场-非点检字段获取.py

@@ -0,0 +1,108 @@
+import datetime
+import multiprocessing
+import os
+import sys
+
+sys.path.insert(0, os.path.abspath(__file__).split("tmp_file")[0])
+
+from utils.file.trans_methods import read_excel_files, copy_to_new, read_file_to_df
+from utils.zip.unzip import unzip, get_desc_path, unrar
+import pandas as pd
+
+read_cols = ['Time', '设备主要状态', '功率曲线风速', '湍流强度', '实际风速', '有功功率', '桨叶角度A', '桨叶角度B',
+             '桨叶角度C', '机舱内温度', '机舱外温度', '绝对风向', '机舱绝对位置', '叶轮转速', '发电机转速', '瞬时风速',
+             '有功设定反馈', '当前理论可发最大功率', '空气密度', '偏航误差', '发电机扭矩', '瞬时功率', '风向1s',
+             '偏航压力', '桨叶1速度', '桨叶2速度', '桨叶3速度', '桨叶1角度给定', '桨叶2角度给定', '桨叶3角度给定',
+             '轴1电机电流', '轴2电机电流', '轴3电机电流', '轴1电机温度', '轴2电机温度', '轴3电机温度', '待机', '启动',
+             '偏航', '并网', '限功率', '正常发电', '故障', '计入功率曲线', '运行发电机冷却风扇1', '运行发电机冷却风扇2',
+             '激活偏航解缆阀', '激活偏航刹车阀', '激活风轮刹车阀', '激活顺时针偏航', '激活逆时针偏航', '电缆扭角']
+
+read_path = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/收资数据/sec'
+save_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/清理数据/点检表以外测点儿-20241210'
+tmp_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/tmp/second/excel_tmp'
+
+# read_path = r'D:\data\张崾先风电场\6'
+# save_dir = r'D:\data\张崾先风电场\点检表以外测点儿-20241209'
+# tmp_dir = r'D:\data\张崾先风电场\tmp'
+
+os.makedirs(tmp_dir, exist_ok=True)
+os.makedirs(save_dir, exist_ok=True)
+
+
+def get_and_remove(file):
+    to_path = tmp_dir
+    if str(file).endswith("zip"):
+        if str(file).endswith("csv.zip"):
+            copy_to_new(file, file.replace(read_path, to_path).replace("csv.zip", 'csv.gz'))
+        else:
+            desc_path = file.replace(read_path, to_path)
+            is_success, e = unzip(file, get_desc_path(desc_path))
+            if not is_success:
+                # raise e
+                pass
+    elif str(file).endswith("rar"):
+        desc_path = file.replace(read_path, to_path)
+        unrar(file, get_desc_path(desc_path))
+    else:
+        copy_to_new(file, file.replace(read_path, to_path))
+
+
+def get_resolve(file_path, exist_wind_names, map_lock):
+    begin = datetime.datetime.now()
+    df = read_file_to_df(file_path, read_cols=read_cols)
+    wind_name = str(os.path.basename(file_path)[0:2])
+    date = os.path.basename(file_path)[14:24]
+    df['Time'] = df['Time'].apply(lambda x: date + ' ' + x)
+    df = df[read_cols]
+    with map_lock[str(wind_name)]:
+        if wind_name in exist_wind_names:
+            df.to_csv(save_dir + '/' + wind_name + '.csv', mode='a', index=False, header=False, encoding='utf8')
+        else:
+            df.to_csv(save_dir + '/' + wind_name + '.csv', index=False, encoding='utf8')
+            exist_wind_names.append(wind_name)
+
+    print(os.path.basename(file_path), '执行完成,耗时:', get_haoshi(begin))
+
+
+def sort_data(file_path):
+    df = pd.read_csv(file_path, encoding='utf8')
+    df['Time'] = pd.to_datetime(df['Time'], error='coerce')
+    df.sort_values(by=['Time'], inplace=True)
+    df.to_csv(file_path, index=False, encoding='utf8')
+
+
+def get_haoshi(begin):
+    return datetime.datetime.now() - begin
+
+
+if __name__ == '__main__':
+    begin = datetime.datetime.now()
+    # all_files = read_files(read_path)
+    # split_count = get_available_cpu_count_with_percent(1 / 2)
+    # all_arrays = split_array(all_files, split_count)
+    #
+    # for index, arr in enumerate(all_arrays):
+    #     with multiprocessing.Pool(10) as pool:
+    #         pool.starmap(get_and_remove, [(i,) for i in arr])
+    #
+    # print("移动完成,耗时:", get_haoshi(begin))
+
+    # exist_wind_names = multiprocessing.Manager().list()
+    #
+    # map_lock = dict()
+    # for i in range(26, 42):
+    #     map_lock[str(i)] = multiprocessing.Manager().Lock()
+    #
+    # all_files = read_excel_files(tmp_dir)
+    # with multiprocessing.Pool(16) as pool:
+    #     pool.starmap(get_resolve, [(i, exist_wind_names, map_lock) for i in all_files])
+    #
+    # print("整理完成,耗时:", get_haoshi(begin))
+
+    all_files = read_excel_files(save_dir)
+    with multiprocessing.Pool(4) as pool:
+        pool.map(sort_data, all_files)
+    print("排序完成,耗时:", get_haoshi(begin))
+
+    # shutil.rmtree(tmp_dir)
+    # print("移除临时文件完成,耗时:", get_haoshi(begin))

+ 83 - 0
wind_farm/中广核/minute_data.py

@@ -0,0 +1,83 @@
+import datetime
+import logging
+import os
+
+import pandas as pd
+import sys
+from sqlalchemy import create_engine
+
+# 更新为第三方数据源
+engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/appoint')
+
+base_dir = r'/data/logs/104'
+save_dir = base_dir + os.sep + 'minute'
+log_dir = base_dir + os.sep + 'logs' + os.sep + 'minute'
+
+wind_farm_code_dict = {
+    '风场编号1': '山西风场',
+    '风场编号2': '桂林风场'
+}
+
+
+def create_dir(save_dir, is_file=False):
+    if is_file:
+        save_dir = os.path.dirname(save_dir)
+    os.makedirs(save_dir, exist_ok=True)
+
+
+def init_log():
+    logger = logging.getLogger("104data")
+    logger.setLevel(logging.INFO)
+    stout_handle = logging.StreamHandler(sys.stdout)
+    stout_handle.setFormatter(
+        logging.Formatter("%(asctime)s: %(message)s"))
+    stout_handle.setLevel(logging.INFO)
+    logger.addHandler(stout_handle)
+    create_dir(log_dir)
+    file_name = log_dir + os.sep + datetime.datetime.now().strftime('%Y%m') + '-info.log'
+    file_handler = logging.FileHandler(file_name, encoding='utf-8')
+    file_handler.setFormatter(
+        logging.Formatter("%(asctime)s: %(message)s"))
+    file_handler.setLevel(logging.INFO)
+    logger.addHandler(file_handler)
+
+    file_name = log_dir + os.sep + datetime.datetime.now().strftime('%Y%m') + '-error.log'
+    file_handler = logging.FileHandler(file_name, encoding='utf-8')
+    file_handler.setFormatter(
+        logging.Formatter("%(asctime)s: %(message)s"))
+    file_handler.setLevel(logging.ERROR)
+    logger.addHandler(file_handler)
+
+    return logger
+
+
+logger = init_log()
+
+
+def info_print(*kwargs):
+    message = " ".join([str(i) for i in kwargs])
+    logger.info(message)
+
+
+def error_print(*kwargs):
+    message = " ".join([str(i) for i in kwargs])
+    logger.error(message)
+
+
+def get_data_and_save_file(df_sql, save_path):
+    info_print(df_sql)
+    df = pd.read_sql_query(df_sql, engine)
+    info_print(df.shape)
+
+
+if __name__ == '__main__':
+    info_print("开始执行")
+    begin = datetime.datetime.now()
+    yestoday = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y%m%d')
+    yestoday_sql = f"select * from information_schema.TABLES where TABLE_NAME = {yestoday}"
+
+    get_data_and_save_file(yestoday_sql,
+                           os.path.join(save_dir, wind_farm_code_dict['风场编号1'], yestoday[0:4], yestoday[0:6],
+                                        f'{yestoday}.csv.gz'))
+
+    info_print("执行结束,总耗时:", datetime.datetime.now() - begin)

+ 83 - 0
wind_farm/中广核/purge_history_data.py

@@ -0,0 +1,83 @@
+import datetime
+import logging
+import os
+import sys
+
+import pandas as pd
+from sqlalchemy import create_engine, text
+
+engine = create_engine('mysql+pymysql://root:admin123456@192.168.50.235:30306/appoint')
+
+base_dir = r'/data/logs/104'
+log_dir = base_dir + os.sep + 'logs' + os.sep + 'delete'
+
+
+def create_dir(save_dir, is_file=False):
+    if is_file:
+        save_dir = os.path.dirname(save_dir)
+    os.makedirs(save_dir, exist_ok=True)
+
+
+def init_log():
+    logger = logging.getLogger("104data")
+    logger.setLevel(logging.INFO)
+    stout_handle = logging.StreamHandler(sys.stdout)
+    stout_handle.setFormatter(
+        logging.Formatter("%(asctime)s: %(message)s"))
+    stout_handle.setLevel(logging.INFO)
+    logger.addHandler(stout_handle)
+    create_dir(log_dir)
+    file_name = log_dir + os.sep + datetime.datetime.now().strftime('%Y%m') + '-info.log'
+    file_handler = logging.FileHandler(file_name, encoding='utf-8')
+    file_handler.setFormatter(
+        logging.Formatter("%(asctime)s: %(message)s"))
+    file_handler.setLevel(logging.INFO)
+    logger.addHandler(file_handler)
+
+    file_name = log_dir + os.sep + datetime.datetime.now().strftime('%Y%m') + '-error.log'
+    file_handler = logging.FileHandler(file_name, encoding='utf-8')
+    file_handler.setFormatter(
+        logging.Formatter("%(asctime)s: %(message)s"))
+    file_handler.setLevel(logging.ERROR)
+    logger.addHandler(file_handler)
+
+    return logger
+
+
+logger = init_log()
+
+
+def info_print(*kwargs):
+    message = " ".join([str(i) for i in kwargs])
+    logger.info(message)
+
+
+def error_print(*kwargs):
+    message = " ".join([str(i) for i in kwargs])
+    logger.error(message)
+
+
+def drop_table(lastdays):
+    # 构建查询语句
+    query = text(
+        f"SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_SCHEMA='appoint' AND TABLE_NAME like '{lastdays}%'")
+    table_df = pd.read_sql(query, engine)
+
+    info_print('查询到表', table_df['TABLE_NAME'].values)
+    for table_name in table_df['TABLE_NAME'].values:
+        # 构建删除表的SQL语句
+        drop_query = text(f"DROP TABLE {table_name}")
+        # 执行删除操作
+        with engine.connect() as connection:
+            connection.execute(drop_query)
+
+        info_print(f"Table {table_name} deleted")
+
+
+if __name__ == '__main__':
+    info_print("开始执行")
+    begin = datetime.datetime.now()
+    lastdays = (datetime.datetime.now() - datetime.timedelta(days=8)).strftime('%Y%m%d')
+    print(lastdays)
+    drop_table(lastdays)
+    info_print("执行结束,总耗时:", datetime.datetime.now() - begin)