import datetime import multiprocessing import os import chardet import pandas as pd def detect_file_encoding(filename): # 读取文件的前1000个字节(足够用于大多数编码检测) with open(filename, 'rb') as f: rawdata = f.read(1000) result = chardet.detect(rawdata) encoding = result['encoding'] print("文件类型:", filename, encoding) if encoding is None: encoding = 'gb18030' if encoding.lower() in ['utf-8', 'ascii', 'utf8', 'utf-8-sig']: return 'utf-8' return 'gb18030' def __build_directory_dict(directory_dict, path, filter_types=None): # 遍历目录下的所有项 for item in os.listdir(path): item_path = os.path.join(path, item) if os.path.isdir(item_path): __build_directory_dict(directory_dict, item_path, filter_types=filter_types) elif os.path.isfile(item_path): if path not in directory_dict: directory_dict[path] = [] if filter_types is None or len(filter_types) == 0: directory_dict[path].append(item_path) elif str(item_path).split(".")[-1] in filter_types: if str(item_path).count("~$") == 0: directory_dict[path].append(item_path) # 读取路径下所有的excel文件 def read_excel_files(read_path, filter_types=None): if filter_types is None: filter_types = ['xls', 'xlsx', 'csv', 'gz'] if os.path.isfile(read_path): return [read_path] directory_dict = {} __build_directory_dict(directory_dict, read_path, filter_types=filter_types) return [path for paths in directory_dict.values() for path in paths if path] def read_file_to_df(file_path): df = pd.read_csv(file_path, encoding=detect_file_encoding(file_path)) date = os.path.basename(file_path)[14:24] df['Time'] = df['Time'].apply(lambda x: date + ' ' + x) return df def read_files_and_save_csv(file_dir, month, save_dir): begin = datetime.datetime.now() base_dir = os.path.basename(file_dir) print(f"{datetime.datetime.now()}: 开始执行{base_dir}-{month}") all_files = read_excel_files(os.path.join(file_dir, month)) df = pd.concat([read_file_to_df(file) for file in all_files], ignore_index=True) save_path = os.path.join(save_dir, base_dir, f'{month}.csv') os.makedirs(os.path.dirname(save_path), exist_ok=True) df.sort_values(by=['Time'], inplace=True) df.to_csv(save_path, encoding='utf8', index=False) print(f"{datetime.datetime.now()}: 执行{base_dir}-{month}结束,耗时{datetime.datetime.now() - begin}") if __name__ == '__main__': begin = datetime.datetime.now() read_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/tmp/second/excel_tmp/' save_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/清理数据/20241217完整字段' read_dirs = list() for i in range(26, 42): read_dirs.append(os.path.join(read_dir, str(i))) for read_dir in read_dirs: begin = datetime.datetime.now() with multiprocessing.Pool(6) as pool: pool.starmap(read_files_and_save_csv, [(read_dir, i, save_dir) for i in os.listdir(read_dir)]) print(f"{datetime.datetime.now()}: 执行结束,总耗时{datetime.datetime.now() - begin}")