import os import matplotlib import numpy as np from matplotlib import pyplot as plt matplotlib.use('Agg') matplotlib.rcParams['font.family'] = 'SimHei' # 或者 'Microsoft YaHei' matplotlib.rcParams['font.sans-serif'] = ['SimHei'] # 或者 ['Microsoft YaHei'] import pandas as pd import chardet import warnings warnings.filterwarnings("ignore") # 获取文件编码 def detect_file_encoding(filename): # 读取文件的前1000个字节(足够用于大多数编码检测) with open(filename, 'rb') as f: rawdata = f.read(1000) result = chardet.detect(rawdata) encoding = result['encoding'] if encoding is None: encoding = 'gb18030' if encoding and encoding.lower() == 'gb2312' or encoding.lower().startswith("windows"): encoding = 'gb18030' return encoding def del_blank(df=pd.DataFrame(), cols=list()): for col in cols: if df[col].dtype == object: df[col] = df[col].str.strip() return df # 切割数组到多个数组 def split_array(array, num): return [array[i:i + num] for i in range(0, len(array), num)] # 读取数据到df def read_file_to_df(file_path, read_cols=list(), header=0): try: df = pd.DataFrame() if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"): encoding = detect_file_encoding(file_path) end_with_gz = str(file_path).lower().endswith("gz") if read_cols: if end_with_gz: df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header) else: df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header, on_bad_lines='warn') else: if end_with_gz: df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header) else: df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn') else: xls = pd.ExcelFile(file_path) # 获取所有的sheet名称 sheet_names = xls.sheet_names for sheet in sheet_names: if read_cols: now_df = pd.read_excel(xls, sheet_name=sheet, header=header, usecols=read_cols) else: now_df = pd.read_excel(xls, sheet_name=sheet, header=header) df = pd.concat([df, now_df]) print('文件读取成功', file_path, '文件数量', df.shape) except Exception as e: print('读取文件出错', file_path, str(e)) message = '文件:' + os.path.basename(file_path) + ',' + str(e) raise ValueError(message) return df def __build_directory_dict(directory_dict, path, filter_types=None): # 遍历目录下的所有项 for item in os.listdir(path): item_path = os.path.join(path, item) if os.path.isdir(item_path): __build_directory_dict(directory_dict, item_path, filter_types=filter_types) elif os.path.isfile(item_path): if path not in directory_dict: directory_dict[path] = [] if filter_types is None or len(filter_types) == 0: directory_dict[path].append(item_path) elif str(item_path).split(".")[-1] in filter_types: if str(item_path).count("~$") == 0: directory_dict[path].append(item_path) # 读取所有文件 # 读取路径下所有的excel文件 def read_excel_files(read_path): directory_dict = {} __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz']) return [path for paths in directory_dict.values() for path in paths if path] class ContractPowerCurve(object): def __init__(self, df: pd.DataFrame, wind_velocity='风速', active_power='功率'): self.df = df self.wind_velocity = wind_velocity self.active_power = active_power # 创建路径 def create_file_path(path, is_file_path=False): if is_file_path: path = os.path.dirname(path) if not os.path.exists(path): os.makedirs(path, exist_ok=True) def scatter(title, x_label, y_label, x_values, y_values, color='blue', size=10, save_file_path=''): if save_file_path: create_file_path(save_file_path, True) else: save_file_path = title + '.png' plt.figure(figsize=(8, 6)) plt.title(title, fontsize=16) plt.xlabel(x_label, fontsize=14) plt.ylabel(y_label, fontsize=14) plt.scatter(x_values, y_values, s=size, c=color) plt.savefig(save_file_path) plt.close() def marker_active_power(contract_power_curve_class: ContractPowerCurve, df: pd.DataFrame, active_power='有功功率 kW均值', wind_velocity='风速 m/s均值'): """ 标记有功功率为正的记录 :param contract_power_curve_class: 合同功率曲线 :param df: 原始数据 :return: 标记有功功率为正的原始数据 """ contract_power_curve_df = contract_power_curve_class.df curve_wv = contract_power_curve_df[contract_power_curve_class.wind_velocity].values curve_ap = contract_power_curve_df[contract_power_curve_class.active_power].values df.dropna(subset=[active_power, wind_velocity], inplace=True) ap_gt_0_df = df[df[active_power] > 0] ap_le_0_df = df[df[active_power] <= 0] ap_le_0_df["marker"] = -1 active_power_values = ap_gt_0_df[active_power].values wind_speed_values = ap_gt_0_df[wind_velocity].values ap_gt_0_in = [0] * ap_gt_0_df.shape[0] for i in range(len(ap_gt_0_in)): wind_speed = wind_speed_values[i] active_power = active_power_values[i] # if active_power >= 2200 - 200: # ap_gt_0_in[i] = 1 # else: diffs = np.abs(curve_wv - wind_speed) # 找到差值最小的索引和对应的差值 minDiff, idx = np.min(diffs), np.argmin(diffs) # 使用找到的索引获取对应的值 closestValue = curve_ap[idx] if active_power - closestValue >= -100: ap_gt_0_in[i] = 1 ap_gt_0_df['marker'] = ap_gt_0_in return pd.concat([ap_gt_0_df, ap_le_0_df]) if __name__ == '__main__': wind_power_df = read_file_to_df(r"D:\中能智能\matlib计算相关\标记derating\PV_Curve.csv") all_files = read_excel_files(r"Z:\collection_data\1进行中\诺木洪风电场-甘肃-华电\清理数据\min-666") save_path = r"D:\trans_data\诺木洪\清理数据\min-666-derating" wind_power_df_class = ContractPowerCurve(wind_power_df) for file in all_files: name = os.path.basename(file).split("@")[0] try: df = read_file_to_df(file) df = marker_active_power(wind_power_df_class, df) df = df[df['marker'] == 1] # 保存筛选后数据 name = name.replace('HD', 'HD2') df.to_csv(os.path.join(save_path, name + '.csv'), index=False, encoding='utf-8') # 使用scatter函数绘制散点图 if not df.empty: scatter(name, x_label='风速均值', y_label='有功功率均值', x_values=df['风速 m/s均值'].values, y_values=df['有功功率 kW均值'].values, color='green', save_file_path=os.path.join(save_path, name + '均值.png')) except Exception as e: print(os.path.basename(file), "出错", str(e)) raise e