张崾先振动.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import datetime
  2. import multiprocessing
  3. import os
  4. from concurrent.futures.thread import ThreadPoolExecutor
  5. import pandas as pd
  6. def __build_directory_dict(directory_dict, path, filter_types=None):
  7. # 遍历目录下的所有项
  8. for item in os.listdir(path):
  9. item_path = os.path.join(path, item)
  10. if os.path.isdir(item_path):
  11. __build_directory_dict(directory_dict, item_path, filter_types=filter_types)
  12. elif os.path.isfile(item_path):
  13. if path not in directory_dict:
  14. directory_dict[path] = []
  15. if filter_types is None or len(filter_types) == 0:
  16. directory_dict[path].append(item_path)
  17. elif str(item_path).split(".")[-1] in filter_types:
  18. if str(item_path).count("~$") == 0:
  19. directory_dict[path].append(item_path)
  20. # 读取路径下所有的excel文件
  21. def read_excel_files(read_path, filter_types=None):
  22. if filter_types is None:
  23. filter_types = ['xls', 'xlsx', 'csv', 'gz']
  24. if os.path.isfile(read_path):
  25. return [read_path]
  26. directory_dict = {}
  27. __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
  28. return [path for paths in directory_dict.values() for path in paths if path]
  29. # 读取路径下所有的文件
  30. def read_files(read_path, filter_types=None):
  31. if filter_types is None:
  32. filter_types = ['xls', 'xlsx', 'csv', 'gz', 'zip', 'rar']
  33. if os.path.isfile(read_path):
  34. return [read_path]
  35. directory_dict = {}
  36. __build_directory_dict(directory_dict, read_path, filter_types=filter_types)
  37. return [path1 for paths in directory_dict.values() for path1 in paths if path1]
  38. def get_line_count(file_path):
  39. with open(file_path, 'r', encoding='utf-8') as file:
  40. return sum(1 for _ in file)
  41. def read_file_and_read_count_exec(file_path):
  42. base_name = os.path.basename(file_path).split('.')[0]
  43. cols = base_name.split('_')
  44. cols.append(get_line_count(file_path))
  45. return cols
  46. def read_file_and_read_count(index, file_paths, datas):
  47. pretty_print(f'开始执行:{index + 1}')
  48. with ThreadPoolExecutor(max_workers=10) as executor:
  49. colses = list(executor.map(read_file_and_read_count_exec, file_paths))
  50. datas.extend(colses)
  51. pretty_print(f'结束执行:{index + 1}],数据长度:{len(datas)}')
  52. def get_name(x):
  53. result_str = ''
  54. if x['col3'] != '无':
  55. result_str += x['col3']
  56. result_str += x['col2']
  57. if x['col4'] != '无':
  58. result_str += x['col4']
  59. result_str += x['col6']
  60. return result_str
  61. def split_array(array, num):
  62. return [array[i:i + num] for i in range(0, len(array), num)]
  63. def pretty_print(*args):
  64. print(datetime.datetime.now(), ",".join([str(arg) for arg in args]))
  65. if __name__ == '__main__':
  66. datas = multiprocessing.Manager().list()
  67. all_files = read_files(r'D:\cms数据\张崾先风电场2期-导出\CMSFTPServer\ZYXFDC2', ['txt'])
  68. # all_files = read_files(r'D:\cms数据\测试\result\CMSFTPServer\ZYXFDC2', ['txt'])
  69. pretty_print(f"文件长度{len(all_files)}")
  70. arrays = split_array(all_files, 5000)
  71. pretty_print(f"切分个数{len(arrays)}")
  72. with multiprocessing.Pool(10) as pool:
  73. pool.starmap(read_file_and_read_count, [(index, file_paths, datas) for index, file_paths in enumerate(arrays)])
  74. df = pd.DataFrame(data=list(datas), columns=[f'col{i}' for i in range(10)])
  75. df['col8'] = pd.to_datetime(df['col8'], format='%Y%m%d%H%M%S', errors='coerce')
  76. df.sort_values(by=['col1', 'col8'], inplace=True)
  77. df['测点完整名称'] = df.apply(get_name, axis=1)
  78. df.to_csv('d://cms数据//cms_data.csv', index=False, encoding='utf8')