Axis0DataImpl.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import multiprocessing
  2. import os
  3. from trans.ExecParam import ExecParam
  4. from utils.file.trans_methods import read_excel_files, split_array, find_header, read_file_to_df
  5. from utils.log.import_data_log import log_print
  6. from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
  7. class Axis0DataImpl(object):
  8. def __init__(self, id, process_count, now_count, exec_param: ExecParam, save_db=True):
  9. self.id = id
  10. self.process_count = process_count
  11. self.now_count = now_count
  12. self.exec_param = exec_param
  13. self.save_db = save_db
  14. # self.lock_map = dict()
  15. # for i in range(1000):
  16. # self.lock_map[i] = multiprocessing.Manager().Lock()
  17. self.lock = multiprocessing.Manager().Lock()
  18. self.field_dict = multiprocessing.Manager().dict()
  19. def get_lock(self, split_col_value, col_name):
  20. boolean_first_time = False
  21. if split_col_value:
  22. filed_name = f'{split_col_value}_{col_name}'
  23. else:
  24. filed_name = col_name
  25. exists_count = len(self.field_dict.keys())
  26. # if exists_count >= 4:
  27. # return self.lock
  28. if filed_name not in self.field_dict:
  29. boolean_first_time = True
  30. self.field_dict[filed_name] = len(self.field_dict.keys()) + 1
  31. # return boolean_first_time, self.lock_map[self.field_dict[filed_name]]
  32. return boolean_first_time, self.lock
  33. def read_and_save_file(self, file_path):
  34. if self.exec_param.has_header:
  35. header = find_header(file_path, self.exec_param.use_cols)
  36. if header is None:
  37. raise Exception(f"文件{os.path.basename(file_path)}没有找到列名")
  38. else:
  39. header = None
  40. df = read_file_to_df(file_path, header=header, use_cols=self.exec_param.use_cols)
  41. col_map = {'file_name': 'file_name'}
  42. if 'sheet_name' in df.columns:
  43. col_map['sheet_name'] = 'sheet_name'
  44. pass
  45. df.rename(mapper=self.exec_param.mapping_cols, errors='ignore', inplace=True)
  46. if self.exec_param.split_cols:
  47. df['split_col'] = df[self.exec_param.split_cols].apply(
  48. lambda x: '_'.join([str(i).replace(' ', '_').replace(':', '_') for i in x.values]),
  49. axis=1)
  50. else:
  51. df['split_col'] = 'All'
  52. split_col = df['split_col'].unique()
  53. if len(split_col) >= 1000:
  54. log_print(f"{file_path}切割文件太多,大于等于1000个")
  55. raise Exception(f"{file_path}切割文件太多,大于等于1000个")
  56. general_fields = list(df.columns)
  57. general_fields.remove('split_col')
  58. general_fields.remove('file_name')
  59. if 'sheet_name' in general_fields:
  60. general_fields.remove('sheet_name')
  61. for col in self.exec_param.index_cols:
  62. general_fields.remove(col)
  63. for split_col_value in split_col:
  64. for col in general_fields:
  65. now_cols = self.exec_param.index_cols
  66. now_cols.append(col)
  67. now_df = df[df['split_col'] == split_col_value][now_cols]
  68. boolean_first_time, lock = self.get_lock(split_col_value, col)
  69. with lock:
  70. path = self.exec_param.path_param.get_process_tmp_path()
  71. os.makedirs(path, exist_ok=True)
  72. if boolean_first_time:
  73. now_df.to_csv(os.path.join(path, f'{split_col_value}.csv'), index=False, encoding='utf-8')
  74. else:
  75. now_df.to_csv(os.path.join(path, f'{split_col_value}.csv'), index=False, mode='a', header=False,
  76. encoding='utf-8')
  77. def run(self):
  78. all_files = read_excel_files(self.exec_param.path_param.get_unzip_tmp_path())
  79. split_count = use_files_get_max_cpu_count(all_files)
  80. all_arrays = split_array(all_files, split_count)
  81. log_print("开始读取竖向合并文件,文件总数:", len(all_files), ",文件分片数:", split_count)
  82. with multiprocessing.Pool(split_count) as pool:
  83. pool.map(self.read_and_save_file, all_arrays)