from multiprocessing import Pool from os import * import chardet import pandas as pd # 获取文件编码 def detect_file_encoding(filename): # 读取文件的前1000个字节(足够用于大多数编码检测) with open(filename, 'rb') as f: rawdata = f.read(1000) result = chardet.detect(rawdata) encoding = result['encoding'] if encoding is None: encoding = 'gb18030' if encoding and encoding.lower() == 'gb2312' or encoding.lower().startswith("windows"): encoding = 'gb18030' return encoding # 读取数据到df def read_file_to_df(file_path, read_cols=list(), header=0): df = pd.DataFrame() if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"): encoding = detect_file_encoding(file_path) end_with_gz = str(file_path).lower().endswith("gz") if read_cols: if end_with_gz: df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header) else: df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header, on_bad_lines='warn') else: if end_with_gz: df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header) else: df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn') else: xls = pd.ExcelFile(file_path) # 获取所有的sheet名称 sheet_names = xls.sheet_names for sheet in sheet_names: if read_cols: df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header, usecols=read_cols)]) else: df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header)]) return df def __build_directory_dict(directory_dict, path, filter_types=None): # 遍历目录下的所有项 for item in listdir(path): item_path = path.join(path, item) if path.isdir(item_path): __build_directory_dict(directory_dict, item_path, filter_types=filter_types) elif path.isfile(item_path): if path not in directory_dict: directory_dict[path] = [] if filter_types is None or len(filter_types) == 0: directory_dict[path].append(item_path) elif str(item_path).split(".")[-1] in filter_types: if str(item_path).count("~$") == 0: directory_dict[path].append(item_path) # 读取所有文件 # 读取路径下所有的excel文件 def read_excel_files(read_path): directory_dict = {} __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz']) return [path for paths in directory_dict.values() for path in paths if path] # 创建路径 def create_file_path(path, is_file_path=False): if is_file_path: path = path.dirname(path) if not path.exists(path): makedirs(path, exist_ok=True) def read_status(status_path): all_files = read_excel_files(status_path) with Pool(20) as pool: dfs = pool.starmap(read_file_to_df, [(file, ['设备名称', '状态码', '开始时间'], 2) for file in all_files]) df = pd.concat(dfs) df = df[df['状态码'].isin([3, 5])] df['开始时间'] = pd.to_datetime(df['开始时间']) df['处理后时间'] = (df['开始时间'] + pd.Timedelta(minutes=10)).apply( lambda x: f"{x.year}-{str(x.month).zfill(2)}-{str(x.day).zfill(2)} {str(x.hour).zfill(2)}:{x.minute // 10}0:00") df['处理后时间'] = pd.to_datetime(df['处理后时间']) df = df[(df['处理后时间'] >= '2023-09-01 00:00:00')] df[df['处理后时间'] >= '2024-09-01 00:00:00'] = '2024-09-01 00:00:00' df.sort_values(by=['设备名称', '处理后时间'], inplace=True) return df def read_fault_data(fault_path): all_files = read_excel_files(fault_path) with Pool(20) as pool: dfs = pool.starmap(read_file_to_df, [(file, ['设备名称', '故障开始时间'], 2) for file in all_files]) df = pd.concat(dfs) df = df[df['设备名称'].str.startswith("#")] df['故障开始时间'] = pd.to_datetime(df['故障开始时间']) df['处理后故障开始时间'] = (df['故障开始时间'] + pd.Timedelta(minutes=10)).apply( lambda x: f"{x.year}-{str(x.month).zfill(2)}-{str(x.day).zfill(2)} {str(x.hour).zfill(2)}:{x.minute // 10}0:00") df['处理后故障开始时间'] = pd.to_datetime(df['处理后故障开始时间']) df = df[(df['处理后故障开始时间'] >= '2023-09-01 00:00:00') & (df['处理后故障开始时间'] < '2024-09-01 00:00:00')] df.sort_values(by=['设备名称', '处理后故障开始时间'], inplace=True) return df def read_10min_data(data_path): all_files = read_excel_files(data_path) with Pool(20) as pool: dfs = pool.starmap(read_file_to_df, [(file, ['设备名称', '时间', '平均风速(m/s)', '平均网侧有功功率(kW)'], 1) for file in all_files]) df = pd.concat(dfs) df['时间'] = pd.to_datetime(df['时间']) df = df[(df['时间'] >= '2023-09-01 00:00:00') & (df['时间'] < '2024-09-01 00:00:00')] df.sort_values(by=['设备名称', '时间'], inplace=True) return df def select_data_and_save(name, fault_df, origin_df): df = pd.DataFrame() for i in range(fault_df.shape[0]): fault = fault_df.iloc[i] con1 = origin_df['时间'] >= fault['处理后故障开始时间'] con2 = origin_df['时间'] <= fault['结束时间'] df = pd.concat([df, origin_df[con1 & con2]]) name = name.replace('#', 'F') df.drop_duplicates(inplace=True) df.to_csv(save_path + sep + name + '.csv', index=False, encoding='utf8') if __name__ == '__main__': base_path = r'/data/download/白玉山/需要整理的数据' save_path = base_path + sep + 'sele_data_202409261135' create_file_path(save_path) status_df = read_status(base_path + sep + '设备状态') fault_df = read_fault_data(base_path + sep + '故障') data_df = read_10min_data(base_path + sep + '十分钟') status_df.to_csv(base_path + sep + '设备状态' + '.csv', index=False, encoding='utf8') fault_df.to_csv(base_path + sep + '故障' + '.csv', index=False, encoding='utf8') data_df.to_csv(base_path + sep + '十分钟' + '.csv', index=False, encoding='utf8') print(status_df.shape) print(fault_df.shape) print(data_df.shape) fault_list = list() for i in range(fault_df.shape[0]): data = fault_df.iloc[i] con1 = status_df['设备名称'] == data['设备名称'] con2 = status_df['处理后时间'] >= data['处理后故障开始时间'] fault_list.append(status_df[con1 & con2]['处理后时间'].min()) fault_df['结束时间'] = fault_list status_df.to_csv(base_path + sep + '设备状态' + '.csv', index=False, encoding='utf8') fault_df.to_csv(base_path + sep + '故障' + '.csv', index=False, encoding='utf8') data_df.to_csv(base_path + sep + '十分钟' + '.csv', index=False, encoding='utf8') names = set(fault_df['设备名称']) fault_map = dict() data_map = dict() for name in names: fault_map[name] = fault_df[fault_df['设备名称'] == name] data_map[name] = data_df[data_df['设备名称'] == name] with Pool(20) as pool: pool.starmap(select_data_and_save, [(name, fault_map[name], data_map[name]) for name in names])