王博提取数据完整风机数据.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. import datetime
  2. import multiprocessing
  3. import os
  4. import chardet
  5. import pandas as pd
  6. def detect_file_encoding(filename):
  7. # 读取文件的前1000个字节(足够用于大多数编码检测)
  8. with open(filename, 'rb') as f:
  9. rawdata = f.read(1000)
  10. result = chardet.detect(rawdata)
  11. encoding = result['encoding']
  12. print("文件类型:", filename, encoding)
  13. if encoding is None:
  14. encoding = 'gb18030'
  15. if encoding.lower() in ['utf-8', 'ascii', 'utf8', 'utf-8-sig']:
  16. return 'utf-8'
  17. return 'gb18030'
  18. def __build_directory_dict(directory_dict, path, filter_types=None):
  19. # 遍历目录下的所有项
  20. for item in os.listdir(path):
  21. item_path = os.path.join(path, item)
  22. if os.path.isdir(item_path):
  23. __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
  24. elif os.path.isfile(item_path):
  25. if path not in directory_dict:
  26. directory_dict[path] = []
  27. if filter_types is None or len(filter_types) == 0:
  28. directory_dict[path].append(item_path)
  29. elif str(item_path).split(".")[-1] in filter_types:
  30. if str(item_path).count("~$") == 0:
  31. directory_dict[path].append(item_path)
  32. # 读取路径下所有的excel文件
  33. def read_excel_files(read_path, filter_types=None):
  34. if filter_types is None:
  35. filter_types = ['xls', 'xlsx', 'csv', 'gz']
  36. if os.path.isfile(read_path):
  37. return [read_path]
  38. directory_dict = {}
  39. __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
  40. return [path for paths in directory_dict.values() for path in paths if path]
  41. def read_file_to_df(file_path):
  42. df = pd.read_csv(file_path, encoding=detect_file_encoding(file_path))
  43. date = os.path.basename(file_path)[14:24]
  44. df['Time'] = df['Time'].apply(lambda x: date + ' ' + x)
  45. return df
  46. def read_files_and_save_csv(file_dir, month, save_dir):
  47. begin = datetime.datetime.now()
  48. base_dir = os.path.basename(file_dir)
  49. print(f"{datetime.datetime.now()}: 开始执行{base_dir}-{month}")
  50. all_files = read_excel_files(os.path.join(file_dir, month))
  51. df = pd.concat([read_file_to_df(file) for file in all_files], ignore_index=True)
  52. save_path = os.path.join(save_dir, base_dir, f'{month}.csv')
  53. os.makedirs(os.path.dirname(save_path), exist_ok=True)
  54. df.sort_values(by=['Time'], inplace=True)
  55. df.to_csv(save_path, encoding='utf8', index=False)
  56. print(f"{datetime.datetime.now()}: 执行{base_dir}-{month}结束,耗时{datetime.datetime.now() - begin}")
  57. if __name__ == '__main__':
  58. begin = datetime.datetime.now()
  59. read_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/tmp/second/excel_tmp/'
  60. save_dir = r'/data/download/collection_data/1进行中/张崾先风电场-陕西-华电/清理数据/20241217完整字段'
  61. read_dirs = list()
  62. for i in range(26, 42):
  63. read_dirs.append(os.path.join(read_dir, str(i)))
  64. for read_dir in read_dirs:
  65. begin = datetime.datetime.now()
  66. with multiprocessing.Pool(6) as pool:
  67. pool.starmap(read_files_and_save_csv, [(read_dir, i, save_dir) for i in os.listdir(read_dir)])
  68. print(f"{datetime.datetime.now()}: 执行结束,总耗时{datetime.datetime.now() - begin}")