Procházet zdrojové kódy

合并表,读取不到数据,直接跳过

wzl před 8 měsíci
rodič
revize
f1fb2f2e4d

+ 1 - 1
conf/etl_config_datang.yaml

@@ -1,5 +1,5 @@
 plt:
-  database: energy_prod
+  database: energy
   host: 172.16.37.22
   password: admin123456
   port: 3306

+ 1 - 2
etl/wind_power/min_sec/ReadAndSaveTmp.py

@@ -111,7 +111,6 @@ class ReadAndSaveTmp(object):
                 exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
                 df['wind_turbine_number'] = eval(exec_str)
 
-
         self.save_to_tmp_csv(df)
 
     def save_to_tmp_csv(self, df):
@@ -228,7 +227,7 @@ class ReadAndSaveTmp(object):
                     trans_cols.append(v)
             trans_cols = list(set(trans_cols))
             if self.trans_param.merge_columns:
-                df = read_file_to_df(file_path, trans_cols=trans_cols)
+                df = read_file_to_df(file_path, trans_cols=trans_cols, not_find_header='ignore')
             else:
                 if self.trans_param.need_valid_cols:
                     df = read_file_to_df(file_path, read_cols, trans_cols=trans_cols)

+ 2 - 0
ge_requirement.sh

@@ -0,0 +1,2 @@
+#!/bin/bash
+pip freeze > requirements.txt

+ 2 - 0
requirements.txt

@@ -2,12 +2,14 @@ chardet==5.2.0
 contourpy==1.3.0
 cycler==0.12.1
 DBUtils==3.1.0
+et-xmlfile==1.1.0
 fonttools==4.53.1
 greenlet==3.0.3
 importlib_resources==6.4.5
 kiwisolver==1.4.7
 matplotlib==3.9.2
 numpy==2.0.0
+openpyxl==3.1.5
 packaging==24.1
 pandas==2.2.2
 pillow==10.4.0

+ 0 - 0
tmp_file/__init__.py


+ 197 - 0
tmp_file/baiyushan_20240906.py

@@ -0,0 +1,197 @@
+import datetime
+import os
+from multiprocessing import Pool
+
+import chardet
+import pandas as pd
+
+
+# 获取文件编码
+def detect_file_encoding(filename):
+    # 读取文件的前1000个字节(足够用于大多数编码检测)
+    with open(filename, 'rb') as f:
+        rawdata = f.read(1000)
+    result = chardet.detect(rawdata)
+    encoding = result['encoding']
+
+    if encoding is None:
+        encoding = 'gb18030'
+
+    if encoding and encoding.lower() == 'gb2312' or encoding.lower().startswith("windows"):
+        encoding = 'gb18030'
+    return encoding
+
+
+# 读取数据到df
+def read_file_to_df(file_path, read_cols=list(), header=0):
+    df = pd.DataFrame()
+    if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
+        encoding = detect_file_encoding(file_path)
+        end_with_gz = str(file_path).lower().endswith("gz")
+        if read_cols:
+            if end_with_gz:
+                df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header)
+            else:
+                df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header, on_bad_lines='warn')
+        else:
+
+            if end_with_gz:
+                df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header)
+            else:
+                df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn')
+
+    else:
+        xls = pd.ExcelFile(file_path)
+        # 获取所有的sheet名称
+        sheet_names = xls.sheet_names
+        for sheet in sheet_names:
+            if read_cols:
+                df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header, usecols=read_cols)])
+            else:
+                df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header)])
+
+    return df
+
+
+def __build_directory_dict(directory_dict, path, filter_types=None):
+    # 遍历目录下的所有项
+    for item in os.listdir(path):
+        item_path = os.path.join(path, item)
+        if os.path.isdir(item_path):
+            __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
+        elif os.path.isfile(item_path):
+            if path not in directory_dict:
+                directory_dict[path] = []
+
+            if filter_types is None or len(filter_types) == 0:
+                directory_dict[path].append(item_path)
+            elif str(item_path).split(".")[-1] in filter_types:
+                if str(item_path).count("~$") == 0:
+                    directory_dict[path].append(item_path)
+
+    # 读取所有文件
+
+
+# 读取路径下所有的excel文件
+def read_excel_files(read_path):
+    directory_dict = {}
+    __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
+
+    return [path for paths in directory_dict.values() for path in paths if path]
+
+
+# 创建路径
+def create_file_path(path, is_file_path=False):
+    if is_file_path:
+        path = os.path.dirname(path)
+
+    if not os.path.exists(path):
+        os.makedirs(path, exist_ok=True)
+
+
+def read_status(status_path):
+    all_files = read_excel_files(status_path)
+
+    with Pool(20) as pool:
+        dfs = pool.starmap(read_file_to_df, [(file, ['设备名称', '状态码', '开始时间'], 2) for file in all_files])
+
+    df = pd.concat(dfs)
+    df = df[df['状态码'].isin([3, 5])]
+    df['开始时间'] = pd.to_datetime(df['开始时间'])
+
+    df['处理后时间'] = (df['开始时间'] + pd.Timedelta(minutes=10)).apply(
+        lambda x: f"{x.year}-{str(x.month).zfill(2)}-{str(x.day).zfill(2)} {str(x.hour).zfill(2)}:{x.minute // 10}0:00")
+
+    df['处理后时间'] = pd.to_datetime(df['处理后时间'])
+    df = df[(df['处理后时间'] >= '2023-09-01 00:00:00')]
+    df[df['处理后时间'] >= '2024-09-01 00:00:00'] = '2024-09-01 00:00:00'
+    df.sort_values(by=['设备名称', '处理后时间'], inplace=True)
+
+    return df
+
+
+def read_fault_data(fault_path):
+    all_files = read_excel_files(fault_path)
+
+    with Pool(20) as pool:
+        dfs = pool.starmap(read_file_to_df, [(file, ['设备名称', '故障开始时间'], 2) for file in all_files])
+
+    df = pd.concat(dfs)
+    df = df[df['设备名称'].str.startswith("#")]
+    df['故障开始时间'] = pd.to_datetime(df['故障开始时间'])
+
+    df['处理后故障开始时间'] = (df['故障开始时间'] + pd.Timedelta(minutes=10)).apply(
+        lambda x: f"{x.year}-{str(x.month).zfill(2)}-{str(x.day).zfill(2)} {str(x.hour).zfill(2)}:{x.minute // 10}0:00")
+
+    df['处理后故障开始时间'] = pd.to_datetime(df['处理后故障开始时间'])
+    df = df[(df['处理后故障开始时间'] >= '2023-09-01 00:00:00') & (df['处理后故障开始时间'] < '2024-09-01 00:00:00')]
+    df.sort_values(by=['设备名称', '处理后故障开始时间'], inplace=True)
+
+    return df
+
+
+def read_10min_data(data_path):
+    all_files = read_excel_files(data_path)
+
+    with Pool(20) as pool:
+        dfs = pool.starmap(read_file_to_df,
+                           [(file, ['设备名称', '时间', '平均风速(m/s)', '平均网侧有功功率(kW)'], 1) for file in all_files])
+
+    df = pd.concat(dfs)
+    df['时间'] = pd.to_datetime(df['时间'])
+
+    df = df[(df['时间'] >= '2023-09-01 00:00:00') & (df['时间'] < '2024-09-01 00:00:00')]
+    df.sort_values(by=['设备名称', '时间'], inplace=True)
+    return df
+
+
+def select_data_and_save(name, fault_df, origin_df):
+    df = pd.DataFrame()
+    for i in range(fault_df.shape[0]):
+        fault = fault_df.iloc[i]
+        con1 = origin_df['时间'] >= fault['处理后故障开始时间']
+        con2 = origin_df['时间'] <= fault['结束时间']
+        df = pd.concat([df, origin_df[con1 & con2]])
+
+    name = name.replace('#', 'F')
+    df.drop_duplicates(inplace=True)
+    df.to_csv(save_path + os.sep + name + '.csv', index=False, encoding='utf8')
+
+
+if __name__ == '__main__':
+    base_path = r'/data/download/白玉山/需要整理的数据'
+    save_path = base_path + os.sep + 'sele_data_202409261135'
+    create_file_path(save_path)
+    status_df = read_status(base_path + os.sep + '设备状态')
+    fault_df = read_fault_data(base_path + os.sep + '故障')
+    data_df = read_10min_data(base_path + os.sep + '十分钟')
+
+    status_df.to_csv(base_path + os.sep + '设备状态' + '.csv', index=False, encoding='utf8')
+    fault_df.to_csv(base_path + os.sep + '故障' + '.csv', index=False, encoding='utf8')
+    data_df.to_csv(base_path + os.sep + '十分钟' + '.csv', index=False, encoding='utf8')
+
+    print(status_df.shape)
+    print(fault_df.shape)
+    print(data_df.shape)
+
+    fault_list = list()
+    for i in range(fault_df.shape[0]):
+        data = fault_df.iloc[i]
+        con1 = status_df['设备名称'] == data['设备名称']
+        con2 = status_df['处理后时间'] >= data['处理后故障开始时间']
+        fault_list.append(status_df[con1 & con2]['处理后时间'].min())
+    fault_df['结束时间'] = fault_list
+
+    status_df.to_csv(base_path + os.sep + '设备状态' + '.csv', index=False, encoding='utf8')
+    fault_df.to_csv(base_path + os.sep + '故障' + '.csv', index=False, encoding='utf8')
+    data_df.to_csv(base_path + os.sep + '十分钟' + '.csv', index=False, encoding='utf8')
+
+    names = set(fault_df['设备名称'])
+    fault_map = dict()
+    data_map = dict()
+    for name in names:
+        fault_map[name] = fault_df[fault_df['设备名称'] == name]
+        data_map[name] = data_df[data_df['设备名称'] == name]
+
+    with Pool(20) as pool:
+        pool.starmap(select_data_and_save, [(name, fault_map[name], data_map[name]) for name in names])

+ 48 - 0
tmp_file/filter_lose_data.py

@@ -0,0 +1,48 @@
+import datetime
+
+import pandas as pd
+
+df = pd.read_csv("D:\data\白玉山后评估数据资料\十分钟.csv", encoding='utf8')
+
+df['时间'] = pd.to_datetime(df['时间'])
+df['plus_10min'] = df['时间'] + pd.Timedelta(minutes=10)
+
+names = set(df['设备名称'])
+
+
+def get_time_space_count(start_time: datetime.datetime, end_time: datetime.datetime, time_space=1):
+    """
+    获取俩个时间之间的个数
+    :return: 查询时间间隔
+    """
+    delta = end_time - start_time
+    total_seconds = delta.days * 24 * 60 * 60 + delta.seconds
+
+    return abs(int(total_seconds / time_space))
+
+
+result_dict = dict()
+for name in names:
+    q_df = df[df['设备名称'] == name]
+    q_df['unshift'] = q_df['时间'].shift(-1)
+    q_df.fillna('2024-09-01 00:00:00', inplace=True)
+    result_df = q_df[~(q_df['plus_10min'] == q_df['unshift'])]
+    result_df.reset_index(inplace=True)
+    q_list = list()
+    count = 0
+    result_df.to_csv('test.csv', encoding='utf8')
+    for i in range(result_df.shape[0]):
+        data = result_df.iloc[i]
+        begin = data['时间']
+        end = data['unshift']
+        count = count + get_time_space_count(begin, end, 600) - 1
+        # if end is not None and end != np.nan:
+        #     q_list.append(f"{begin} ~ {end}")
+
+    result_dict[name] = count
+
+with open("缺失_数量.csv", 'w', encoding='utf8') as f:
+    for k, v in result_dict.items():
+        # v.insert(0, k)
+        # f.write(",".join(v) + "\n")
+        f.write(f"{k},{v}\n")

+ 27 - 0
tmp_file/hebing_matlib_result.py

@@ -0,0 +1,27 @@
+import os
+import pandas as pd
+
+read_path = r"D:\data\电量损失及散点图"
+df = pd.DataFrame()
+
+cols = ['风机', '应发电量', '实发电量', '停机损失电量', '坏点+限电损失电量', '性能损失电量', '坏点损失电量', '限电损失电量', '超发电量', '应发电量百分比', '实发电量百分比',
+        '停机损失电量百分比', '坏点+限电损失电量百分比', '性能损失电量百分比', '坏点损失电量百分比', '限电损失电量百分比', '超发电量百分比', '平均风速', '可利用率']
+
+for root, dir, files in os.walk(read_path):
+    if files:
+        base_name = os.path.basename(root)
+        wind_df = pd.DataFrame()
+        print(root)
+        df1 = pd.read_excel(os.path.join(root, "EPPer.xls"), usecols=['应发电量百分比', '实发电量百分比',
+                                                                     '停机损失电量百分比', '坏点+限电损失电量百分比', '性能损失电量百分比',
+                                                                     '坏点损失电量百分比',
+                                                                     '限电损失电量百分比', '超发电量百分比', '平均风速', '可利用率'])
+        df2 = pd.read_excel(os.path.join(root, "EPKW.xls"),
+                            usecols=['应发电量', '实发电量', '停机损失电量', '坏点+限电损失电量', '性能损失电量', '坏点损失电量', '限电损失电量', '超发电量'])
+        wind_df = pd.concat([df1, df2], axis=1)
+        wind_df['风机'] = base_name
+        wind_df.reset_index(inplace=True)
+        print(wind_df.columns)
+        df = pd.concat([df, wind_df], ignore_index=True)
+
+df.to_csv("合并结果.csv", index=False, encoding='utf8', columns=cols)

+ 38 - 0
tmp_file/queshi_bili.py

@@ -0,0 +1,38 @@
+import datetime
+
+import pandas as pd
+
+
+def get_time_space_count(start_time: datetime.datetime, end_time: datetime.datetime, time_space=1):
+    """
+    获取俩个时间之间的个数
+    :return: 查询时间间隔
+    """
+    delta = end_time - start_time
+    total_seconds = delta.days * 24 * 60 * 60 + delta.seconds
+
+    return abs(int(total_seconds / time_space))
+
+
+df = pd.read_csv("D:\data\白玉山后评估数据资料\十分钟.csv", encoding='utf8')
+
+df['时间'] = pd.to_datetime(df['时间'])
+df['plus_10min'] = df['时间'] + pd.Timedelta(minutes=10)
+
+names = list(set(df['设备名称']))
+names.sort()
+
+count = get_time_space_count(datetime.datetime.strptime('2023-09-01 00:00:00', '%Y-%m-%d %H:%M:%S'),
+                             datetime.datetime.strptime('2024-09-01 00:00:00', '%Y-%m-%d %H:%M:%S'), 600)
+
+result_df = pd.DataFrame(df['设备名称'].value_counts())
+result_df.reset_index(inplace=True)
+result_df.columns = ['风机', '数量']
+
+result_df['总数'] = count
+
+result_df['完整度'] = result_df['数量'].apply(lambda x: round(x * 100 / count, 2))
+
+result_df.sort_values(by=['风机'], inplace=True)
+
+print(result_df)

+ 35 - 0
tmp_file/白玉山每月限电损失.py

@@ -0,0 +1,35 @@
+import os
+
+import pandas as pd
+
+read_path = r'D:\data\白玉山后评估数据资料\需要整理的数据\每月发电量和限电量、限电率'
+
+all_paths = list()
+for root, dirs, files in os.walk(read_path):
+    if files:
+        for file in files:
+            year_mont = int(file.split("(")[1].split("_")[0])
+            if year_mont >= 20230901 and year_mont < 20240901:
+                all_paths.append(os.path.join(root, file))
+
+df = pd.DataFrame()
+
+for path in all_paths:
+    now_df = pd.read_excel(path, usecols=['设备名称', '统计时间', '限电损失电量(kWh)'], header=2)
+    now_df = now_df[now_df['设备名称'].str.startswith("#")]
+    df = pd.concat([df, now_df])
+
+## 人工验证 看一看
+print(df[df['设备名称'] == '#34'])
+
+df = df[['设备名称', '限电损失电量(kWh)']]
+group_df = df.groupby('设备名称').sum()
+
+result_df = pd.DataFrame(group_df)
+result_df.reset_index(inplace=True)
+result_df.columns = ['设备名称', '总限电损失电量(kWh)']
+result_df.sort_values(by=['设备名称'], inplace=True)
+
+print(result_df)
+
+result_df.to_csv("设备总限电损失.csv", encoding='utf-8', index=False)

+ 44 - 39
utils/file/trans_methods.py

@@ -78,7 +78,7 @@ def find_read_header(file_path, trans_cols):
 
 
 # 读取数据到df
-def read_file_to_df(file_path, read_cols=list(), trans_cols=None, nrows=None):
+def read_file_to_df(file_path, read_cols=list(), trans_cols=None, nrows=None, not_find_header='raise'):
     begin = datetime.datetime.now()
     trans_print('开始读取文件', file_path)
     header = 0
@@ -87,49 +87,54 @@ def read_file_to_df(file_path, read_cols=list(), trans_cols=None, nrows=None):
         header, find_cols = find_read_header(file_path, trans_cols)
         trans_print(os.path.basename(file_path), "读取第", header, "行")
         if header is None:
-            message = '未匹配到开始行,请检查并重新指定'
-            trans_print(message)
-            raise Exception(message)
+            if not_find_header == 'raise':
+                message = '未匹配到开始行,请检查并重新指定'
+                trans_print(message)
+                raise Exception(message)
+            elif not_find_header == 'ignore':
+                pass
 
     read_cols.extend(find_cols)
-
-    try:
-        df = pd.DataFrame()
-        if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
-            encoding = detect_file_encoding(file_path)
-            end_with_gz = str(file_path).lower().endswith("gz")
-            if read_cols:
-                if end_with_gz:
-                    df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header,
-                                     nrows=nrows)
+    df = pd.DataFrame()
+    if header is not None:
+        try:
+            if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"):
+                encoding = detect_file_encoding(file_path)
+                end_with_gz = str(file_path).lower().endswith("gz")
+                if read_cols:
+                    if end_with_gz:
+                        df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip',
+                                         header=header,
+                                         nrows=nrows)
+                    else:
+                        df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header,
+                                         on_bad_lines='warn', nrows=nrows)
                 else:
-                    df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header,
-                                     on_bad_lines='warn', nrows=nrows)
-            else:
 
-                if end_with_gz:
-                    df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header, nrows=nrows)
-                else:
-                    df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn', nrows=nrows)
+                    if end_with_gz:
+                        df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header, nrows=nrows)
+                    else:
+                        df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn', nrows=nrows)
 
-        else:
-            xls = pd.ExcelFile(file_path)
-            # 获取所有的sheet名称
-            sheet_names = xls.sheet_names
-            for sheet_name in sheet_names:
-                if read_cols:
-                    now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, usecols=read_cols, nrows=nrows)
-                else:
-                    now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, nrows=nrows)
-
-                now_df['sheet_name'] = sheet_name
-                df = pd.concat([df, now_df])
-            xls.close()
-        trans_print('文件读取成功:', file_path, '数据数量:', df.shape, '耗时:', datetime.datetime.now() - begin)
-    except Exception as e:
-        trans_print('读取文件出错', file_path, str(e))
-        message = '文件:' + os.path.basename(file_path) + ',' + str(e)
-        raise ValueError(message)
+            else:
+                xls = pd.ExcelFile(file_path)
+                # 获取所有的sheet名称
+                sheet_names = xls.sheet_names
+                for sheet_name in sheet_names:
+                    if read_cols:
+                        now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, usecols=read_cols,
+                                               nrows=nrows)
+                    else:
+                        now_df = pd.read_excel(xls, sheet_name=sheet_name, header=header, nrows=nrows)
+
+                    now_df['sheet_name'] = sheet_name
+                    df = pd.concat([df, now_df])
+                xls.close()
+            trans_print('文件读取成功:', file_path, '数据数量:', df.shape, '耗时:', datetime.datetime.now() - begin)
+        except Exception as e:
+            trans_print('读取文件出错', file_path, str(e))
+            message = '文件:' + os.path.basename(file_path) + ',' + str(e)
+            raise ValueError(message)
 
     return df