123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- import zipfile
- import rarfile
- import pandas as pd
- from io import BytesIO
- import os
- from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
- # 递归处理压缩文件或文件夹
- def process_compressed_file(file_path, file_name=None):
- if file_name is None:
- file_name = file_path
- if file_name.endswith('.zip'):
- # 处理 ZIP 文件
- with zipfile.ZipFile(file_path, 'r') as z:
- for inner_file in z.namelist():
- with z.open(inner_file) as f:
- inner_file_content = BytesIO(f.read())
- yield from process_compressed_file(inner_file_content, inner_file)
- elif file_name.endswith('.rar'):
- # 处理 RAR 文件
- with rarfile.RarFile(file_path) as rf:
- for inner_file in rf.namelist():
- with rf.open(inner_file) as f:
- inner_file_content = BytesIO(f.read())
- yield from process_compressed_file(inner_file_content, inner_file)
- elif file_name.endswith('.csv'):
- # 处理 CSV 文件(支持 GBK 编码)
- if isinstance(file_path, BytesIO):
- file_path.seek(0) # 重置指针
- df = pd.read_csv(file_path, encoding='gbk')
- else:
- df = pd.read_csv(file_path, encoding='gbk')
- yield df
- elif file_name.endswith('.csv.gz'):
- # 处理 GZIP 压缩的 CSV 文件(支持 GBK 编码)
- if isinstance(file_path, BytesIO):
- file_path.seek(0) # 重置指针
- df = pd.read_csv(file_path, compression='gzip', encoding='gbk')
- else:
- df = pd.read_csv(file_path, compression='gzip', encoding='gbk')
- yield df
- elif isinstance(file_path, str) and os.path.isdir(file_path):
- # 处理文件夹
- for root, _, files in os.walk(file_path):
- for file in files:
- full_path = os.path.join(root, file)
- yield from process_compressed_file(full_path, file)
- else:
- print(f"不支持的文件格式: {file_name}")
- # 多进程加多线程处理压缩文件
- def process_file_concurrently(file_path):
- dfs = []
- with ThreadPoolExecutor() as thread_pool:
- futures = []
- for df in process_compressed_file(file_path):
- futures.append(thread_pool.submit(lambda x: x, df)) # 提交任务到线程池
- for future in as_completed(futures):
- dfs.append(future.result()) # 获取结果
- return dfs
- # 主函数
- def main():
- # 压缩文件路径
- # 使用多进程处理压缩文件
- dfs = []
- with ProcessPoolExecutor() as process_pool:
- futures = []
- futures.append(process_pool.submit(process_file_concurrently, compressed_file_path)) # 提交任务到进程池
- for future in as_completed(futures):
- dfs.extend(future.result()) # 获取结果
- # 合并所有 DataFrame
- if dfs:
- combined_df = pd.concat(dfs, ignore_index=True)
- print(combined_df.head())
- else:
- print("未找到 CSV 文件")
- if __name__ == '__main__':
- compressed_file_path = r'D:\data\data.zip'
- main()
|