ReadAndSaveTmp.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. import datetime
  2. import multiprocessing
  3. import os
  4. import traceback
  5. import pandas as pd
  6. from etl.base import TransParam
  7. from etl.base.PathsAndTable import PathsAndTable
  8. from service.plt_service import update_trans_transfer_progress
  9. from utils.file.trans_methods import read_excel_files, split_array, del_blank, \
  10. create_file_path, read_file_to_df
  11. from utils.log.trans_log import trans_print
  12. from utils.systeminfo.sysinfo import use_files_get_max_cpu_count
  13. class ReadAndSaveTmp(object):
  14. def __init__(self, pathsAndTable: PathsAndTable, trans_param: TransParam):
  15. self.pathsAndTable = pathsAndTable
  16. self.trans_param = trans_param
  17. self.exist_wind_names = multiprocessing.Manager().list()
  18. self.lock = multiprocessing.Manager().Lock()
  19. def _save_to_tmp_csv_by_name(self, df, name):
  20. save_name = str(name) + '.csv'
  21. save_path = os.path.join(self.pathsAndTable.get_read_tmp_path(), save_name)
  22. create_file_path(save_path, is_file_path=True)
  23. with self.lock:
  24. if name in self.exist_wind_names:
  25. contains_name = True
  26. else:
  27. contains_name = False
  28. self.exist_wind_names.append(name)
  29. if contains_name:
  30. df.to_csv(save_path, index=False, encoding='utf8', mode='a',
  31. header=False)
  32. else:
  33. df.to_csv(save_path, index=False, encoding='utf8')
  34. def df_save_to_tmp_file(self, df=pd.DataFrame()):
  35. if self.trans_param.is_vertical_table:
  36. pass
  37. else:
  38. # 转换字段
  39. same_col = {}
  40. if self.trans_param.cols_tran:
  41. cols_tran = self.trans_param.cols_tran
  42. real_cols_trans = dict()
  43. for k, v in cols_tran.items():
  44. if v and not v.startswith("$"):
  45. if v not in real_cols_trans.keys():
  46. real_cols_trans[v] = k
  47. else:
  48. value = real_cols_trans[v]
  49. if value in same_col.keys():
  50. same_col[value].append(k)
  51. else:
  52. same_col[value] = [k]
  53. trans_print("包含转换字段,开始处理转换字段")
  54. df.rename(columns=real_cols_trans, inplace=True)
  55. # 添加使用同一个excel字段的值
  56. for key in same_col.keys():
  57. for col in same_col[key]:
  58. df[col] = df[key]
  59. del_keys = set(df.columns) - set(cols_tran.keys())
  60. for key in del_keys:
  61. df.drop(key, axis=1, inplace=True)
  62. df = del_blank(df, ['wind_turbine_number'])
  63. df = df[df['time_stamp'].isna() == False]
  64. if self.trans_param.wind_name_exec:
  65. exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
  66. df['wind_turbine_number'] = eval(exec_str)
  67. self.save_to_tmp_csv(df)
  68. def save_to_tmp_csv(self, df):
  69. names = set(df['wind_turbine_number'].values)
  70. if names:
  71. trans_print("开始保存", str(names), "到临时文件")
  72. with multiprocessing.Pool(self.pathsAndTable.multi_pool_count) as pool:
  73. pool.starmap(self._save_to_tmp_csv_by_name,
  74. [(df[df['wind_turbine_number'] == name], name) for name in names])
  75. del df
  76. trans_print("保存", str(names), "到临时文件成功, 风机数量", len(names))
  77. def read_file_and_save_tmp(self):
  78. all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
  79. if self.trans_param.merge_columns:
  80. dfs_list = list()
  81. index_keys = [self.trans_param.cols_tran['time_stamp']]
  82. wind_col = self.trans_param.cols_tran['wind_turbine_number']
  83. if str(wind_col).startswith("$"):
  84. wind_col = 'wind_turbine_number'
  85. index_keys.append(wind_col)
  86. df_map = dict()
  87. # todo 需要优化
  88. with multiprocessing.Pool(self.pathsAndTable.multi_pool_count) as pool:
  89. dfs = pool.starmap(self.read_excel_to_df, [(file,) for file in all_files])
  90. for df in dfs:
  91. key = '-'.join(df.columns)
  92. if key in df_map.keys():
  93. df_map[key] = pd.concat([df_map[key], df])
  94. else:
  95. df_map[key] = df
  96. for k, df in df_map.items():
  97. df.drop_duplicates(inplace=True)
  98. df.set_index(keys=index_keys, inplace=True)
  99. df = df[~df.index.duplicated(keep='first')]
  100. dfs_list.append(df)
  101. df = pd.concat(dfs_list, axis=1)
  102. df.reset_index(inplace=True)
  103. try:
  104. self.df_save_to_tmp_file(df)
  105. except Exception as e:
  106. trans_print(traceback.format_exc())
  107. message = "合并列出现错误:" + str(e)
  108. raise ValueError(message)
  109. else:
  110. split_count = use_files_get_max_cpu_count(all_files)
  111. all_arrays = split_array(all_files, split_count)
  112. for index, arr in enumerate(all_arrays):
  113. try:
  114. with multiprocessing.Pool(split_count) as pool:
  115. dfs = pool.starmap(self.read_excel_to_df, [(ar,) for ar in arr])
  116. for df in dfs:
  117. self.df_save_to_tmp_file(df)
  118. except Exception as e:
  119. trans_print(traceback.format_exc())
  120. message = "整理临时文件,系统返回错误:" + str(e)
  121. raise ValueError(message)
  122. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
  123. round(20 + 30 * (index + 1) / len(all_arrays), 2),
  124. self.pathsAndTable.save_db)
  125. def read_excel_to_df(self, file_path):
  126. read_cols = [v.split(",")[0] for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")]
  127. trans_dict = {}
  128. for k, v in self.trans_param.cols_tran.items():
  129. if v and not str(v).startswith("$"):
  130. trans_dict[v] = k
  131. if self.trans_param.is_vertical_table:
  132. vertical_cols = self.trans_param.vertical_cols
  133. df = read_file_to_df(file_path, vertical_cols, header=self.trans_param.header)
  134. df = df[df[self.trans_param.vertical_key].isin(read_cols)]
  135. df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number',
  136. self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True)
  137. df[self.trans_param.vertical_key] = df[self.trans_param.vertical_key].map(trans_dict).fillna(
  138. df[self.trans_param.vertical_key])
  139. return df
  140. else:
  141. trans_dict = dict()
  142. for k, v in self.trans_param.cols_tran.items():
  143. if v and v.startswith("$") or v.find(",") > 0:
  144. trans_dict[v] = k
  145. if self.trans_param.merge_columns:
  146. df = read_file_to_df(file_path, header=self.trans_param.header)
  147. else:
  148. if self.trans_param.need_valid_cols:
  149. df = read_file_to_df(file_path, read_cols, header=self.trans_param.header)
  150. else:
  151. df = read_file_to_df(file_path, header=self.trans_param.header)
  152. # 处理列名前缀问题
  153. if self.trans_param.resolve_col_prefix:
  154. columns_dict = dict()
  155. for column in df.columns:
  156. columns_dict[column] = eval(self.trans_param.resolve_col_prefix)
  157. df.rename(columns=columns_dict, inplace=True)
  158. for k, v in trans_dict.items():
  159. if k.startswith("$file"):
  160. file = ".".join(os.path.basename(file_path).split(".")[0:-1])
  161. if k == "$file":
  162. df[v] = str(file)
  163. elif k.startswith("$file["):
  164. datas = str(k.replace("$file", "").replace("[", "").replace("]", "")).split(":")
  165. if len(datas) != 2:
  166. raise Exception("字段映射出现错误 :" + str(trans_dict))
  167. df[v] = str(file[int(datas[0]):int(datas[1])]).strip()
  168. elif k.find("$file_date") > 0:
  169. datas = str(k.split(",")[1].replace("$file_date", "").replace("[", "").replace("]", "")).split(":")
  170. if len(datas) != 2:
  171. raise Exception("字段映射出现错误 :" + str(trans_dict))
  172. file = ".".join(os.path.basename(file_path).split(".")[0:-1])
  173. date_str = str(file[int(datas[0]):int(datas[1])]).strip()
  174. df[v] = df[k.split(",")[0]].apply(lambda x: date_str + " " + str(x))
  175. elif k.startswith("$folder"):
  176. folder = file_path
  177. cengshu = int(str(k.replace("$folder", "").replace("[", "").replace("]", "")))
  178. for i in range(cengshu):
  179. folder = os.path.dirname(folder)
  180. df[v] = str(str(folder).split(os.sep)[-1]).strip()
  181. return df
  182. def run(self):
  183. trans_print("开始保存数据到临时文件")
  184. begin = datetime.datetime.now()
  185. self.read_file_and_save_tmp()
  186. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 50,
  187. self.pathsAndTable.save_db)
  188. trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin)