import zipfile import rarfile import pandas as pd from io import BytesIO import os from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed # 递归处理压缩文件或文件夹 def process_compressed_file(file_path, file_name=None): if file_name is None: file_name = file_path if file_name.endswith('.zip'): # 处理 ZIP 文件 with zipfile.ZipFile(file_path, 'r') as z: for inner_file in z.namelist(): with z.open(inner_file) as f: inner_file_content = BytesIO(f.read()) yield from process_compressed_file(inner_file_content, inner_file) elif file_name.endswith('.rar'): # 处理 RAR 文件 with rarfile.RarFile(file_path) as rf: for inner_file in rf.namelist(): with rf.open(inner_file) as f: inner_file_content = BytesIO(f.read()) yield from process_compressed_file(inner_file_content, inner_file) elif file_name.endswith('.csv'): # 处理 CSV 文件(支持 GBK 编码) if isinstance(file_path, BytesIO): file_path.seek(0) # 重置指针 df = pd.read_csv(file_path, encoding='gbk') else: df = pd.read_csv(file_path, encoding='gbk') yield df elif file_name.endswith('.csv.gz'): # 处理 GZIP 压缩的 CSV 文件(支持 GBK 编码) if isinstance(file_path, BytesIO): file_path.seek(0) # 重置指针 df = pd.read_csv(file_path, compression='gzip', encoding='gbk') else: df = pd.read_csv(file_path, compression='gzip', encoding='gbk') yield df elif isinstance(file_path, str) and os.path.isdir(file_path): # 处理文件夹 for root, _, files in os.walk(file_path): for file in files: full_path = os.path.join(root, file) yield from process_compressed_file(full_path, file) else: print(f"不支持的文件格式: {file_name}") # 多进程加多线程处理压缩文件 def process_file_concurrently(file_path): dfs = [] with ThreadPoolExecutor() as thread_pool: futures = [] for df in process_compressed_file(file_path): futures.append(thread_pool.submit(lambda x: x, df)) # 提交任务到线程池 for future in as_completed(futures): dfs.append(future.result()) # 获取结果 return dfs # 主函数 def main(): # 压缩文件路径 # 使用多进程处理压缩文件 dfs = [] with ProcessPoolExecutor() as process_pool: futures = [] futures.append(process_pool.submit(process_file_concurrently, compressed_file_path)) # 提交任务到进程池 for future in as_completed(futures): dfs.extend(future.result()) # 获取结果 # 合并所有 DataFrame if dfs: combined_df = pd.concat(dfs, ignore_index=True) print(combined_df.head()) else: print("未找到 CSV 文件") if __name__ == '__main__': compressed_file_path = r'D:\data\data.zip' main()