压缩内读取.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. import zipfile
  2. import rarfile
  3. import pandas as pd
  4. from io import BytesIO
  5. import os
  6. from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
  7. # 递归处理压缩文件或文件夹
  8. def process_compressed_file(file_path, file_name=None):
  9. if file_name is None:
  10. file_name = file_path
  11. if file_name.endswith('.zip'):
  12. # 处理 ZIP 文件
  13. with zipfile.ZipFile(file_path, 'r') as z:
  14. for inner_file in z.namelist():
  15. with z.open(inner_file) as f:
  16. inner_file_content = BytesIO(f.read())
  17. yield from process_compressed_file(inner_file_content, inner_file)
  18. elif file_name.endswith('.rar'):
  19. # 处理 RAR 文件
  20. with rarfile.RarFile(file_path) as rf:
  21. for inner_file in rf.namelist():
  22. with rf.open(inner_file) as f:
  23. inner_file_content = BytesIO(f.read())
  24. yield from process_compressed_file(inner_file_content, inner_file)
  25. elif file_name.endswith('.csv'):
  26. # 处理 CSV 文件(支持 GBK 编码)
  27. if isinstance(file_path, BytesIO):
  28. file_path.seek(0) # 重置指针
  29. df = pd.read_csv(file_path, encoding='gbk')
  30. else:
  31. df = pd.read_csv(file_path, encoding='gbk')
  32. yield df
  33. elif file_name.endswith('.csv.gz'):
  34. # 处理 GZIP 压缩的 CSV 文件(支持 GBK 编码)
  35. if isinstance(file_path, BytesIO):
  36. file_path.seek(0) # 重置指针
  37. df = pd.read_csv(file_path, compression='gzip', encoding='gbk')
  38. else:
  39. df = pd.read_csv(file_path, compression='gzip', encoding='gbk')
  40. yield df
  41. elif isinstance(file_path, str) and os.path.isdir(file_path):
  42. # 处理文件夹
  43. for root, _, files in os.walk(file_path):
  44. for file in files:
  45. full_path = os.path.join(root, file)
  46. yield from process_compressed_file(full_path, file)
  47. else:
  48. print(f"不支持的文件格式: {file_name}")
  49. # 多进程加多线程处理压缩文件
  50. def process_file_concurrently(file_path):
  51. dfs = []
  52. with ThreadPoolExecutor() as thread_pool:
  53. futures = []
  54. for df in process_compressed_file(file_path):
  55. futures.append(thread_pool.submit(lambda x: x, df)) # 提交任务到线程池
  56. for future in as_completed(futures):
  57. dfs.append(future.result()) # 获取结果
  58. return dfs
  59. # 主函数
  60. def main():
  61. # 压缩文件路径
  62. # 使用多进程处理压缩文件
  63. dfs = []
  64. with ProcessPoolExecutor() as process_pool:
  65. futures = []
  66. futures.append(process_pool.submit(process_file_concurrently, compressed_file_path)) # 提交任务到进程池
  67. for future in as_completed(futures):
  68. dfs.extend(future.result()) # 获取结果
  69. # 合并所有 DataFrame
  70. if dfs:
  71. combined_df = pd.concat(dfs, ignore_index=True)
  72. print(combined_df.head())
  73. else:
  74. print("未找到 CSV 文件")
  75. if __name__ == '__main__':
  76. compressed_file_path = r'D:\data\data.zip'
  77. main()