import datetime import multiprocessing import os.path import pandas as pd def __build_directory_dict(directory_dict, path, filter_types=None): # 遍历目录下的所有项 for item in os.listdir(path): item_path = os.path.join(path, item) if os.path.isdir(item_path): __build_directory_dict(directory_dict, item_path, filter_types=filter_types) elif os.path.isfile(item_path): if path not in directory_dict: directory_dict[path] = [] if filter_types is None or len(filter_types) == 0: directory_dict[path].append(item_path) elif str(item_path).split(".")[-1] in filter_types: if str(item_path).count("~$") == 0: directory_dict[path].append(item_path) # 读取路径下所有的excel文件 def read_excel_files(read_path, filter_types=None): if filter_types is None: filter_types = ['xls', 'xlsx', 'csv', 'gz'] if os.path.isfile(read_path): return [read_path] directory_dict = {} __build_directory_dict(directory_dict, read_path, filter_types=filter_types) return [path for paths in directory_dict.values() for path in paths if path] # 读取路径下所有的文件 def read_files(read_path, filter_types=None): if filter_types is None: filter_types = ['xls', 'xlsx', 'csv', 'gz', 'zip', 'rar'] if os.path.isfile(read_path): return [read_path] directory_dict = {} __build_directory_dict(directory_dict, read_path, filter_types=filter_types) return [path1 for paths in directory_dict.values() for path1 in paths if path1] all_files = read_files(r'G:\CMS', ['txt']) def get_line_count(file_path): with open(file_path, 'r', encoding='utf-8') as file: return sum(1 for _ in file) def read_file_and_read_count(index, file_path, datas): if index % 10000 == 0: print(datetime.datetime.now(), index) base_name = os.path.basename(file_path).split('.')[0] cols = base_name.split('_') cols.append(get_line_count(file_path)) datas.append(cols) def get_name(x): result_str = '' if x['col3'] != '无': result_str += x['col3'] result_str += x['col2'] if x['col4'] != '无': result_str += x['col4'] result_str += x['col7'] return result_str if __name__ == '__main__': datas = multiprocessing.Manager().list() with multiprocessing.Pool(20) as pool: pool.starmap(read_file_and_read_count, [(i, file_path, datas) for i, file_path in enumerate(all_files)]) df = pd.DataFrame(datas, columns=[f'col{i}' for i in range(10)]) df['col8'] = pd.to_datetime(df['col8'], format='%Y%m%d%H%M%S', errors='coerce') df.sort_values(by=['col1', 'col8'], inplace=True) df['测点完整名称'] = df.apply(get_name, axis=1) df.to_csv('d://cms_data.csv', index=False, encoding='utf8')