orgranize_hongyang.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import copy
  2. import multiprocessing
  3. import os
  4. import warnings
  5. import chardet
  6. import pandas as pd
  7. warnings.filterwarnings("ignore")
  8. # read_path = r'/home/wzl/test_data/红阳'
  9. # save_dir = r'/home/wzl/test_data/整理'
  10. read_path = r'D:\data\红阳\红阳秒级分测点\红阳'
  11. save_dir = r'D:\data\红阳\红阳秒级分测点\整理'
  12. def __build_directory_dict(directory_dict, path, filter_types=None):
  13. # 遍历目录下的所有项
  14. for item in os.listdir(path):
  15. item_path = os.path.join(path, item)
  16. if os.path.isdir(item_path):
  17. __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
  18. elif os.path.isfile(item_path):
  19. if path not in directory_dict:
  20. directory_dict[path] = []
  21. if filter_types is None or len(filter_types) == 0:
  22. directory_dict[path].append(item_path)
  23. elif str(item_path).split(".")[-1] in filter_types:
  24. if str(item_path).count("~$") == 0:
  25. directory_dict[path].append(item_path)
  26. # 读取路径下所有的excel文件
  27. def read_excel_files(read_path):
  28. if os.path.isfile(read_path):
  29. return [read_path]
  30. directory_dict = {}
  31. __build_directory_dict(directory_dict, read_path, filter_types=['xls', 'xlsx', 'csv', 'gz'])
  32. return [path for paths in directory_dict.values() for path in paths if path]
  33. all_files = read_excel_files(read_path)
  34. # 获取文件编码
  35. def detect_file_encoding(filename):
  36. # 读取文件的前1000个字节(足够用于大多数编码检测)
  37. with open(filename, 'rb') as f:
  38. rawdata = f.read(1000)
  39. result = chardet.detect(rawdata)
  40. encoding = result['encoding']
  41. if encoding is None:
  42. encoding = 'gb18030'
  43. if encoding.lower() in ['utf-8', 'ascii', 'utf8']:
  44. return 'utf-8'
  45. return 'gb18030'
  46. def read_and_organize(file):
  47. df = pd.read_csv(file, encoding=detect_file_encoding(file))
  48. return file, df
  49. if __name__ == '__main__':
  50. with multiprocessing.Pool(10) as pool:
  51. bak_datas = pool.starmap(read_and_organize, [(i,) for i in all_files])
  52. datas = copy.deepcopy(bak_datas)
  53. wind_name_df = dict()
  54. for file, df in datas:
  55. all_cols = [i for i in df.columns if i.find('#') > -1]
  56. col = all_cols[0]
  57. cedian = str(col).split("_")[-1]
  58. wind_names = set([str(i).split("#")[0].replace("红阳风电场_", "") for i in all_cols])
  59. print(file, df.columns)
  60. for wind_name in wind_names:
  61. cols = [i for i in all_cols if i.find('_' + wind_name) > -1]
  62. cols.insert(0, '统计时间')
  63. query_df = df[cols]
  64. query_df.columns = [str(i).split('_')[-1] for i in query_df.columns]
  65. query_df['风机编号'] = wind_name
  66. if wind_name in wind_name_df.keys():
  67. now_df = wind_name_df[wind_name]
  68. wind_name_df[wind_name] = pd.merge(now_df, query_df, on=['统计时间', '风机编号'], how='outer')
  69. else:
  70. wind_name_df[wind_name] = query_df
  71. for wind_name, df in wind_name_df.items():
  72. df.to_csv(os.path.join(save_dir, wind_name + '#.csv'), index=False, encoding='utf8')