import multiprocessing import os import matplotlib import numpy as np from matplotlib import pyplot as plt matplotlib.use('Agg') matplotlib.rcParams['font.family'] = 'SimHei' # 或者 'Microsoft YaHei' matplotlib.rcParams['font.sans-serif'] = ['SimHei'] # 或者 ['Microsoft YaHei'] import chardet import warnings warnings.filterwarnings("ignore") import datetime import pandas as pd def get_time_space(df, time_str): """ :return: 查询时间间隔 """ begin = datetime.datetime.now() df1 = pd.DataFrame(df[time_str]) df1[time_str] = pd.to_datetime(df1[time_str], errors='coerce') df1.sort_values(by=time_str, inplace=True) df1['chazhi'] = df1[time_str].shift(-1) - df1[time_str] result = df1.sample(int(df1.shape[0] / 100))['chazhi'].value_counts().idxmax().seconds del df1 print(datetime.datetime.now() - begin) return abs(result) def get_time_space_count(start_time: datetime.datetime, end_time: datetime.datetime, time_space=1): """ 获取俩个时间之间的个数 :return: 查询时间间隔 """ delta = end_time - start_time total_seconds = delta.days * 24 * 60 * 60 + delta.seconds return abs(int(total_seconds / time_space)) + 1 # 获取文件编码 def detect_file_encoding(filename): # 读取文件的前1000个字节(足够用于大多数编码检测) with open(filename, 'rb') as f: rawdata = f.read(1000) result = chardet.detect(rawdata) encoding = result['encoding'] if encoding is None: encoding = 'gb18030' if encoding and encoding.lower() == 'gb2312' or encoding.lower().startswith("windows"): encoding = 'gb18030' return encoding def del_blank(df=pd.DataFrame(), cols=list()): for col in cols: if df[col].dtype == object: df[col] = df[col].str.strip() return df # 切割数组到多个数组 def split_array(array, num): return [array[i:i + num] for i in range(0, len(array), num)] # 读取数据到df def read_file_to_df(file_path, read_cols=list(), header=0): try: df = pd.DataFrame() if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"): encoding = detect_file_encoding(file_path) end_with_gz = str(file_path).lower().endswith("gz") if read_cols: if end_with_gz: df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header) else: df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header, on_bad_lines='warn') else: if end_with_gz: df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header) else: df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn') else: xls = pd.ExcelFile(file_path) # 获取所有的sheet名称 sheet_names = xls.sheet_names for sheet in sheet_names: if read_cols: now_df = pd.read_excel(xls, sheet_name=sheet, header=header, usecols=read_cols) else: now_df = pd.read_excel(xls, sheet_name=sheet, header=header) df = pd.concat([df, now_df]) print('文件读取成功', file_path, '文件数量', df.shape) except Exception as e: print('读取文件出错', file_path, str(e)) message = '文件:' + os.path.basename(file_path) + ',' + str(e) raise ValueError(message) return df def __build_directory_dict(directory_dict, path, filter_types=None): # 遍历目录下的所有项 for item in os.listdir(path): item_path = os.path.join(path, item) if os.path.isdir(item_path): __build_directory_dict(directory_dict, item_path, filter_types=filter_types) elif os.path.isfile(item_path): if path not in directory_dict: directory_dict[path] = [] if filter_types is None or len(filter_types) == 0: directory_dict[path].append(item_path) elif str(item_path).split(".")[-1] in filter_types: if str(item_path).count("~$") == 0: directory_dict[path].append(item_path) # 读取所有文件 # 读取路径下所有的excel文件 def read_excel_files(read_path): directory_dict = {} __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz']) return [path for paths in directory_dict.values() for path in paths if path] # 创建路径 def create_file_path(path, is_file_path=False): if is_file_path: path = os.path.dirname(path) if not os.path.exists(path): os.makedirs(path, exist_ok=True) def time_biaozhun(df): time_space = get_time_space(df, '时间') query_df = df[['时间']] query_df['时间'] = pd.to_datetime(df['时间'], errors="coerce") query_df = query_df.dropna(subset=['时间']) total = get_time_space_count(query_df['时间'].min(), query_df['时间'].max(), time_space) return total, save_percent(1 - query_df.shape[0] / total), save_percent(1 - df.shape[0] / total) def save_percent(value, save_decimal=7): return round(value, save_decimal) * 100 def calc(df, file_name): error_dict = {} lose_dict = {} error_dict['箱变'] = "".join(file_name.split(".")[:-1]) lose_dict['箱变'] = "".join(file_name.split(".")[:-1]) total, lose_time, error_time = time_biaozhun(df) error_dict['时间'] = error_time lose_dict['时间'] = lose_time error_df = pd.DataFrame() lose_df = pd.DataFrame() try: df.columns = ["".join(["逆变器" + "".join(col.split("逆变器")[1:])]) if col.find("逆变器") > -1 else col for col in df.columns] for col in df.columns: if col == '时间': continue query_df = df[[col]] query_df[col] = pd.to_numeric(query_df[col], errors="coerce") query_df = query_df.dropna(subset=[col]) lose_dict[col] = save_percent(1 - query_df.shape[0] / total) if col.find('电压') > -1: error_dict[col] = save_percent(query_df[query_df[col] < 0].shape[0] / total) if col.find('电流') > -1: error_dict[col] = save_percent(query_df[query_df[col] < -0.1].shape[0] / total) if col.find('逆变器效率') > -1: error_dict[col] = save_percent(query_df[(query_df[col] <= 0) | (query_df[col] >= 100)].shape[0] / total) if col.find('温度') > -1: error_dict[col] = save_percent(query_df[(query_df[col] < 0) | (query_df[col] > 100)].shape[0] / total) if col.find('功率因数') > -1: error_dict[col] = save_percent(query_df[(query_df[col] < 0) | (query_df[col] > 1)].shape[0] / total) total, count = 0, 0 for k, v in error_dict.items(): if k != '箱变': total = total + error_dict[k] count = count + 1 error_dict['平均异常率'] = save_percent(total / count / 100) total, count = 0, 0 for k, v in lose_dict.items(): if k != '箱变': total = total + lose_dict[k] count = count + 1 lose_dict['平均缺失率'] = save_percent(total / count / 100) error_df = pd.concat([error_df, pd.DataFrame(error_dict, index=[0])]) lose_df = pd.concat([lose_df, pd.DataFrame(lose_dict, index=[0])]) error_df_cols = ['箱变', '平均异常率'] for col in error_df.columns: if col not in error_df_cols: error_df_cols.append(col) lose_df_cols = ['箱变', '平均缺失率'] for col in lose_df.columns: if col not in lose_df_cols: lose_df_cols.append(col) error_df = error_df[error_df_cols] lose_df = lose_df[lose_df_cols] except Exception as e: print("异常文件", os.path.basename(file_name)) raise e return error_df, lose_df def run(file_path): df = read_file_to_df(file_path) return calc(df, os.path.basename(file_path)) if __name__ == '__main__': # read_path = r'/data/download/大唐玉湖性能分析离线分析/05整理数据/逆变器数据' # save_path = r'/data/download/大唐玉湖性能分析离线分析/06整理数据/逆变器数据' read_path = r'D:\trans_data\大唐玉湖性能分析离线分析\test\yuanshi' save_path = r'D:\trans_data\大唐玉湖性能分析离线分析\test\zhengli' all_files = read_excel_files(read_path) with multiprocessing.Pool(2) as pool: df_arrys = pool.starmap(run, [(file,) for file in all_files]) error_df = pd.concat([df[0] for df in df_arrys]) lose_df = pd.concat([df[1] for df in df_arrys]) with pd.ExcelWriter(os.path.join(save_path, "玉湖光伏数据统计.xlsx")) as writer: error_df.to_excel(writer, sheet_name='error_percent', index=False) lose_df.to_excel(writer, sheet_name='lose_percent', index=False)