# -*- coding: utf-8 -*- """ Created on Tue Jul 9 16:28:48 2024 @author: Administrator """ import multiprocessing from datetime import datetime from os import * import chardet import numpy as np import pandas as pd pd.options.mode.copy_on_write = True # 获取文件编码 def detect_file_encoding(filename): # 读取文件的前1000个字节(足够用于大多数编码检测) with open(filename, 'rb') as f: rawdata = f.read(1000) result = chardet.detect(rawdata) encoding = result['encoding'] if encoding is None: encoding = 'gb18030' if encoding and encoding.lower() == 'gb2312' or encoding.lower().startswith("windows"): encoding = 'gb18030' return encoding # 读取数据到df def read_file_to_df(file_path, read_cols=list(), header=0): df = pd.DataFrame() if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"): encoding = detect_file_encoding(file_path) end_with_gz = str(file_path).lower().endswith("gz") if read_cols: if end_with_gz: df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header) else: df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header, on_bad_lines='warn') else: if end_with_gz: df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header) else: df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn') else: xls = pd.ExcelFile(file_path) # 获取所有的sheet名称 sheet_names = xls.sheet_names for sheet in sheet_names: if read_cols: df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header, usecols=read_cols)]) else: df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header)]) return df def __build_directory_dict(directory_dict, path, filter_types=None): # 遍历目录下的所有项 for item in listdir(path): item_path = path.join(path, item) if path.isdir(item_path): __build_directory_dict(directory_dict, item_path, filter_types=filter_types) elif path.isfile(item_path): if path not in directory_dict: directory_dict[path] = [] if filter_types is None or len(filter_types) == 0: directory_dict[path].append(item_path) elif str(item_path).split(".")[-1] in filter_types: if str(item_path).count("~$") == 0: directory_dict[path].append(item_path) # 读取所有文件 # 读取路径下所有的excel文件 def read_excel_files(read_path): directory_dict = {} __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz']) return [path for paths in directory_dict.values() for path in paths if path] # 创建路径 def create_file_path(path, is_file_path=False): if is_file_path: path = path.dirname(path) if not path.exists(path): makedirs(path, exist_ok=True) def split_index(split_data: str, split_str: str): count = split_data.find(split_str) if count > -1: return split_data[count + len(split_str):] else: return split_str def replece_col_to_biaozhun(col): for k, v in dianjian_dict.items(): if col.find(k) > -1: col = col.replace(k, v) return col return col def row_to_datas(row, pv_dict, inverter_cols, df_cols): row_datas = list(list()) for xiangbian in pv_dict.keys(): for nibianqi in pv_dict[xiangbian].keys(): for pv in pv_dict[xiangbian][nibianqi]: datas = [np.nan] * 14 datas[0] = row['时间'] datas[1] = xiangbian datas[2] = nibianqi datas[3] = pv datas_4_col = "_".join([str(xiangbian), str(nibianqi), str(pv), '支路输出电压']) if datas_4_col in df_cols: datas[4] = row[datas_4_col] else: datas[4] = np.nan datas_5_col = "_".join([str(xiangbian), str(nibianqi), str(pv), '支路输出电流']) if datas_5_col in df_cols: datas[5] = row[datas_5_col] else: datas[5] = np.nan row_datas.append(datas) for xiangbian in pv_dict.keys(): for nibianqi in pv_dict[xiangbian].keys(): datas = [np.nan] * 14 datas[0] = row['时间'] datas[1] = xiangbian datas[2] = nibianqi datas[3] = 0 for index, col_name in enumerate(inverter_cols): col = '_'.join([str(xiangbian), str(nibianqi), col_name]) if col in df_cols: datas[index + 6] = row[col] else: datas[index + 6] = np.nan row_datas.append(datas) return row_datas def df_to_biaozhun(df): pv_cols = ['支路输出电压', '支路输出电流'] inverter_cols = ['逆变器总发电量', '逆变器输出有功功率', '逆变器温度', '交流输出电压', '逆变器输出电流A相', '逆变器输出电流B相', '逆变器输出电流C相', '逆变器转换效率'] # 从列名获取箱变->逆变器->PV等的字典 pv_dict = dict(dict()) for col in df.columns: for pv_col in pv_cols: if str(col).endswith(pv_col): datas = col.split("_") xiangbian = datas[0] nibiangqi = datas[1] pv = datas[2] if xiangbian in pv_dict.keys(): if nibiangqi in pv_dict[xiangbian]: pv_dict[xiangbian][nibiangqi].add(pv) else: pv_dict[xiangbian][nibiangqi] = set([pv]) else: pv_dict[xiangbian] = {nibiangqi: set([pv])} results = df.apply(row_to_datas, args=(pv_dict, inverter_cols, df.columns), axis=1) df_datas = results.to_list() df_datas = [da for data in df_datas for da in data] df_cols = ["时间", "箱变", "逆变器", "支路"] df_cols.extend(pv_cols) df_cols.extend(inverter_cols) df = pd.DataFrame(df_datas, columns=df_cols) type_conver_list = [] type_conver_list.extend(pv_cols) type_conver_list.extend(inverter_cols) for type_conver in type_conver_list: df[type_conver] = pd.to_numeric(df[type_conver], errors='coerce') return df def read_and_save_csv(file_path, save_path): begin = datetime.now() base_name = path.basename(file_path) print('开始', base_name) df = read_file_to_df(file_path) for col in df.columns: for del_col in del_cols: if col.find(del_col) > -1: del df[col] df['时间'] = pd.to_datetime(df['时间']) xiangbian = [col for col in df.columns if str(col).startswith('#') and str(col).find('逆变器') > -1][0].split("逆变器")[ 0].replace("#", "") df.columns = [xiangbian + "_" + split_index(df_col, "逆变器").replace('PV', "").replace("()", "").replace("-", "_") if df_col.startswith( "#") else df_col for df_col in df.columns] df.columns = [col.replace("输入", "_输入") for col in df.columns] df.columns = [replece_col_to_biaozhun(col) for col in df.columns] df = df_to_biaozhun(df) # df.sort_values(by=['时间', "箱变", "逆变器", "支路"], inplace=True) # save_file = path.join(save_path, path.basename(file_path)) # create_file_path(save_file, True) # df.to_csv(save_file, encoding='utf-8', index=False) print('结束', base_name, '耗时:' + str(datetime.now() - begin)) return df dianjian_data_str = """ 输入电压 支路输出电压 输入电流 支路输出电流 功率因数 总发电量 逆变器总发电量 无功功率 有功功率 逆变器输出有功功率 机内温度 逆变器温度 电网AB线电压 交流输出电压 电网A相电流 逆变器输出电流A相 电网BC线电压 电网B相电流 逆变器输出电流B相 电网CA线电压 电网C相电流 逆变器输出电流C相 逆变器效率 逆变器转换效率 """ dianjian_dict = {} del_cols = [] for data in dianjian_data_str.split("\n"): if data: datas = data.split("\t") if len(datas) == 2 and datas[1]: dianjian_dict[datas[0]] = datas[1] else: del_cols.append(datas[0]) if __name__ == '__main__': path = r'/data/download/大唐玉湖性能分析离线分析/05整理数据/逆变器数据' save_path = r'/data/download/大唐玉湖性能分析离线分析/06整理数据/逆变器数据' # path = r'D:\trans_data\大唐玉湖性能分析离线分析\test\yuanshi' # save_path = r'D:\trans_data\大唐玉湖性能分析离线分析\test\zhengli' all_files = read_excel_files(path) all_datas = list(all_files) all_datas.sort() print(all_datas) # for file in all_datas: # read_and_save_csv(file, save_path) with multiprocessing.Pool(40) as pool: dfs = pool.starmap(read_and_save_csv, [(i, save_path) for i in all_datas]) saved_pv_df = pd.concat(dfs) saved_pv_df.sort_values(by=['时间', "箱变", "逆变器", "支路"], inplace=True) save_file = path.join(save_path, "合并.csv") create_file_path(save_file, True) saved_pv_df.sort_values(by=['时间', "箱变", "逆变器", "支路"], inplace=True) saved_pv_df.to_csv(save_file, encoding='utf-8', index=False)