Axis1DataImpl.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import multiprocessing
  2. import os
  3. import traceback
  4. import pandas as pd
  5. from service.import_data_service import update_transfer_progress
  6. from trans.ExecParam import ExecParam
  7. from utils.file.trans_methods import split_array, read_excel_files, read_file_to_df, find_header
  8. from utils.log.import_data_log import log_print
  9. from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, get_dir_size, max_file_size_get_max_cpu_count
  10. class Axis1DataImpl(object):
  11. def __init__(self, id, process_count, now_count, exec_param: ExecParam, save_db=True):
  12. self.id = id
  13. self.process_count = process_count
  14. self.now_count = now_count
  15. self.exec_param = exec_param
  16. self.save_db = save_db
  17. self.lock_map = dict()
  18. for i in range(1000):
  19. self.lock_map[i] = multiprocessing.Manager().Lock()
  20. self.lock = multiprocessing.Manager().Lock()
  21. self.field_dict = multiprocessing.Manager().dict()
  22. def get_lock(self, split_col_value, col_name):
  23. boolean_first_time = False
  24. if split_col_value:
  25. filed_name = f'{split_col_value}_{col_name}'
  26. else:
  27. filed_name = col_name
  28. exists_count = len(self.field_dict.keys())
  29. if exists_count >= 1000:
  30. return self.lock
  31. if filed_name not in self.field_dict:
  32. boolean_first_time = True
  33. self.field_dict[filed_name] = len(self.field_dict.keys()) + 1
  34. return boolean_first_time, self.lock_map[self.field_dict[filed_name]]
  35. def read_and_save_file(self, file_path):
  36. if self.exec_param.has_header:
  37. header = find_header(file_path, self.exec_param.use_cols)
  38. if header is None:
  39. raise Exception(f"文件{os.path.basename(file_path)}没有找到列名")
  40. else:
  41. header = None
  42. df = read_file_to_df(file_path, header=header)
  43. col_map = {'file_name': 'file_name'}
  44. if 'sheet_name' in df.columns:
  45. col_map['sheet_name'] = 'sheet_name'
  46. for col in df.columns:
  47. if col in self.exec_param.use_cols and col not in ['file_name', 'sheet_name']:
  48. col_map[col] = self.exec_param.mapping_cols[col]
  49. df = df.rename(columns=col_map)
  50. # 如果数据不包含索引列报错
  51. for col in self.exec_param.index_cols:
  52. if col not in df.columns:
  53. log_print(f"{file_path}没有索引列{col}")
  54. raise Exception(f"{file_path}没有索引列{col}")
  55. if self.exec_param.split_cols:
  56. df['split_col'] = df[self.exec_param.split_cols].apply(
  57. lambda x: '_'.join([str(i).replace(' ', '_').replace(':', '_') for i in x.values]),
  58. axis=1)
  59. else:
  60. df['split_col'] = 'All'
  61. split_col = df['split_col'].unique()
  62. if len(split_col) >= 1000:
  63. log_print(f"{file_path}切割文件太多,大于等于1000个")
  64. raise Exception(f"{file_path}切割文件太多,大于等于1000个")
  65. general_fields = list(df.columns)
  66. general_fields.remove('split_col')
  67. for col in self.exec_param.index_cols:
  68. general_fields.remove(col)
  69. for split_col_value in split_col:
  70. for col in general_fields:
  71. now_cols = self.exec_param.index_cols
  72. now_cols.append(col)
  73. now_df = df[df['split_col'] == split_col_value][now_cols]
  74. boolean_first_time, lock = self.get_lock(split_col_value, col)
  75. with lock:
  76. path = os.path.join(self.exec_param.path_param.get_merge_tmp_path(), split_col_value)
  77. os.makedirs(path, exist_ok=True)
  78. if boolean_first_time:
  79. now_df.to_csv(os.path.join(path, f'{col}.csv'), index=False, encoding='utf-8')
  80. else:
  81. now_df.to_csv(os.path.join(path, f'{col}.csv'), index=False, mode='a', header=False,
  82. encoding='utf-8')
  83. def read_merge_df_to_process(self, base_name):
  84. all_files = os.listdir(os.path.join(self.exec_param.path_param.get_merge_tmp_path(), base_name))
  85. dfs = [pd.read_csv(i, encoding='utf-8', index_col=self.exec_param.index_cols) for i in all_files]
  86. df = pd.concat(dfs, axis=1, ignore_index=True)
  87. df.reset_index(inplace=True)
  88. df.to_csv(os.path.join(self.exec_param.path_param.get_process_tmp_path(), base_name + '.csv'), index=False,
  89. encoding='utf-8')
  90. def run(self):
  91. if len(self.exec_param.index_cols) == 0:
  92. log_print("合并表需要闯将索引列")
  93. log_print(traceback.format_exc())
  94. raise Exception("合并表需要闯将索引列")
  95. all_files = read_excel_files(self.exec_param.path_param.get_unzip_tmp_path())
  96. split_count = use_files_get_max_cpu_count(all_files)
  97. all_arrays = split_array(all_files, split_count)
  98. log_print("开始读取文件,文件总数:", len(all_files), ",文件分片数:", split_count)
  99. for index, now_array in enumerate(all_arrays):
  100. with multiprocessing.Pool(split_count) as pool:
  101. pool.starmap(self.read_and_save_file, now_array)
  102. update_transfer_progress(self.id, round(20 + 50 * (index + 1) / len(all_arrays)), self.process_count,
  103. self.now_count, self.save_db)
  104. all_dirs = os.listdir(self.exec_param.path_param.get_merge_tmp_path())
  105. dir_size = get_dir_size(os.path.join(self.exec_param.path_param.get_merge_tmp_path(), all_dirs[0]))
  106. pool_count = max_file_size_get_max_cpu_count(dir_size)
  107. pool_count = pool_count if pool_count <= len(all_files) else len(all_files)
  108. with multiprocessing.Pool(pool_count) as pool:
  109. pool.map(self.read_merge_df_to_process, all_dirs)
  110. update_transfer_progress(self.id, 80, self.process_count, self.now_count, self.save_db)