# -*- coding: utf-8 -*- """ Created on Tue Jul 9 16:28:48 2024 @author: Administrator """ from datetime import datetime from os import * import chardet import pandas as pd # 获取文件编码 def detect_file_encoding(filename): # 读取文件的前1000个字节(足够用于大多数编码检测) with open(filename, 'rb') as f: rawdata = f.read(1000) result = chardet.detect(rawdata) encoding = result['encoding'] if encoding is None: encoding = 'gb18030' if encoding and encoding.lower() == 'gb2312' or encoding.lower().startswith("windows"): encoding = 'gb18030' return encoding # 读取数据到df def read_file_to_df(file_path, read_cols=list(), header=0): df = pd.DataFrame() if str(file_path).lower().endswith("csv") or str(file_path).lower().endswith("gz"): encoding = detect_file_encoding(file_path) end_with_gz = str(file_path).lower().endswith("gz") if read_cols: if end_with_gz: df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, compression='gzip', header=header) else: df = pd.read_csv(file_path, encoding=encoding, usecols=read_cols, header=header, on_bad_lines='warn') else: if end_with_gz: df = pd.read_csv(file_path, encoding=encoding, compression='gzip', header=header) else: df = pd.read_csv(file_path, encoding=encoding, header=header, on_bad_lines='warn') else: xls = pd.ExcelFile(file_path) # 获取所有的sheet名称 sheet_names = xls.sheet_names for sheet in sheet_names: if read_cols: df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header, usecols=read_cols)]) else: df = pd.concat([df, pd.read_excel(xls, sheet_name=sheet, header=header)]) return df def __build_directory_dict(directory_dict, path, filter_types=None): # 遍历目录下的所有项 for item in listdir(path): item_path = path.join(path, item) if path.isdir(item_path): __build_directory_dict(directory_dict, item_path, filter_types=filter_types) elif path.isfile(item_path): if path not in directory_dict: directory_dict[path] = [] if filter_types is None or len(filter_types) == 0: directory_dict[path].append(item_path) elif str(item_path).split(".")[-1] in filter_types: if str(item_path).count("~$") == 0: directory_dict[path].append(item_path) # 读取所有文件 # 读取路径下所有的excel文件 def read_excel_files(read_path): directory_dict = {} __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz']) return [path for paths in directory_dict.values() for path in paths if path] # 创建路径 def create_file_path(path, is_file_path=False): if is_file_path: path = path.dirname(path) if not path.exists(path): makedirs(path, exist_ok=True) def read_and_save_csv(file_path): begin = datetime.now() base_name = path.basename(file_path) print('开始', base_name) df1 = read_file_to_df(file_path + "箱变(1-8号逆变器)数据1.xls") del df1['Unnamed: 0'] df1['时间'] = pd.to_datetime(df1['时间']) df1.set_index(keys='时间', inplace=True) df2 = read_file_to_df(file_path + "箱变(9-16号逆变器)数据1.xls") del df2['Unnamed: 0'] df2['时间'] = pd.to_datetime(df2['时间']) df2.set_index(keys='时间', inplace=True) df3 = read_file_to_df(file_path + "箱变(1-8号逆变器)数据2.xls") del df3['Unnamed: 0'] df3['时间'] = pd.to_datetime(df3['时间']) df3.set_index(keys='时间', inplace=True) df4 = read_file_to_df(file_path + "箱变(9-16号逆变器)数据2.xls") del df4['Unnamed: 0'] df4['时间'] = pd.to_datetime(df4['时间']) df4.set_index(keys='时间', inplace=True) df = pd.concat([df1, df2, df3, df4], axis=1) df.reset_index(inplace=True) columns = list(df.columns) columns.sort() print(df.columns) df = df[columns] df.sort_values(by='时间', inplace=True) df.to_csv(path.join(r'D:\trans_data\大唐玉湖性能分析离线分析', '05整理数据', base_name + '_箱变.csv'), encoding='utf-8', index=False) print('结束', base_name, '耗时:' + str(datetime.now() - begin)) if __name__ == '__main__': path = r'D:\trans_data\大唐玉湖性能分析离线分析\test' all_files = read_excel_files(path) all_paths = set() for file in all_files: base_name = path.basename(file).split("箱变")[0] base_path = path.dirname(file) if base_name not in all_paths: all_paths.add(path.join(base_path, base_name)) all_datas = list(all_paths) all_datas.sort() print(all_datas) # with Pool(1) as pool: # pool.starmap(read_and_save_csv, [(i,) for i in all_datas])