ReadAndSaveTmp.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. import datetime
  2. import multiprocessing
  3. import os
  4. import traceback
  5. import pandas as pd
  6. from etl.base import TransParam
  7. from etl.base.PathsAndTable import PathsAndTable
  8. from service.plt_service import update_trans_transfer_progress
  9. from utils.file.trans_methods import read_excel_files, split_array, del_blank, \
  10. create_file_path, read_file_to_df
  11. from utils.log.trans_log import trans_print
  12. from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, get_dir_size, max_file_size_get_max_cpu_count
  13. class ReadAndSaveTmp(object):
  14. def __init__(self, pathsAndTable: PathsAndTable, trans_param: TransParam):
  15. self.pathsAndTable = pathsAndTable
  16. self.trans_param = trans_param
  17. self.exist_wind_names = multiprocessing.Manager().list()
  18. self.lock = multiprocessing.Manager().Lock()
  19. self.file_lock = multiprocessing.Manager().dict()
  20. def _save_to_tmp_csv_by_name(self, df, name):
  21. save_name = str(name) + '.csv'
  22. save_path = os.path.join(self.pathsAndTable.get_read_tmp_path(), save_name)
  23. create_file_path(save_path, is_file_path=True)
  24. with self.lock:
  25. if name in self.exist_wind_names:
  26. contains_name = True
  27. else:
  28. contains_name = False
  29. self.exist_wind_names.append(name)
  30. if contains_name:
  31. df.to_csv(save_path, index=False, encoding='utf8', mode='a',
  32. header=False)
  33. else:
  34. df.to_csv(save_path, index=False, encoding='utf8')
  35. def save_merge_data(self, file_path):
  36. df = self.read_excel_to_df(file_path)
  37. if self.trans_param.wind_name_exec:
  38. exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
  39. df['wind_turbine_number'] = eval(exec_str)
  40. names = set(df['wind_turbine_number'].values)
  41. cols = list(df.columns)
  42. cols.sort()
  43. csv_name = "-".join(cols) + ".csv"
  44. for name in names:
  45. exist_name = name + '-' + csv_name
  46. merge_path = self.pathsAndTable.get_merge_tmp_path(name)
  47. create_file_path(merge_path)
  48. with self.lock:
  49. if exist_name in self.exist_wind_names:
  50. contains_name = True
  51. else:
  52. contains_name = False
  53. self.exist_wind_names.append(exist_name)
  54. save_path = os.path.join(merge_path, csv_name)
  55. if contains_name:
  56. df.to_csv(save_path, index=False, encoding='utf-8', mode='a',
  57. header=False)
  58. else:
  59. df.to_csv(save_path, index=False, encoding='utf-8')
  60. def df_save_to_tmp_file(self, df=pd.DataFrame()):
  61. if self.trans_param.is_vertical_table:
  62. pass
  63. else:
  64. # 转换字段
  65. same_col = {}
  66. if self.trans_param.cols_tran:
  67. cols_tran = self.trans_param.cols_tran
  68. real_cols_trans = dict()
  69. for k, v in cols_tran.items():
  70. if v and not v.startswith("$"):
  71. if v not in real_cols_trans.keys():
  72. real_cols_trans[v] = k
  73. else:
  74. value = real_cols_trans[v]
  75. if value in same_col.keys():
  76. same_col[value].append(k)
  77. else:
  78. same_col[value] = [k]
  79. trans_print("包含转换字段,开始处理转换字段")
  80. df.rename(columns=real_cols_trans, inplace=True)
  81. # 添加使用同一个excel字段的值
  82. for key in same_col.keys():
  83. for col in same_col[key]:
  84. df[col] = df[key]
  85. del_keys = set(df.columns) - set(cols_tran.keys())
  86. for key in del_keys:
  87. df.drop(key, axis=1, inplace=True)
  88. df = del_blank(df, ['wind_turbine_number'])
  89. df = df[df['time_stamp'].isna() == False]
  90. if self.trans_param.wind_name_exec and not self.trans_param.merge_columns:
  91. exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
  92. df['wind_turbine_number'] = eval(exec_str)
  93. # 删除 有功功率 和 风速均为空的情况
  94. df.dropna(subset=['active_power', 'wind_velocity'], how='all', inplace=True)
  95. self.save_to_tmp_csv(df)
  96. def save_to_tmp_csv(self, df):
  97. names = set(df['wind_turbine_number'].values)
  98. if names:
  99. trans_print("开始保存", str(names), "到临时文件")
  100. for name in names:
  101. self._save_to_tmp_csv_by_name(df[df['wind_turbine_number'] == name], name)
  102. del df
  103. trans_print("保存", str(names), "到临时文件成功, 风机数量", len(names))
  104. def merge_df(self, dir_path):
  105. all_files = read_excel_files(dir_path)
  106. df = pd.DataFrame()
  107. for file in all_files:
  108. now_df = read_file_to_df(file)
  109. now_df.dropna(subset=['time_stamp'], inplace=True)
  110. now_df.drop_duplicates(subset=['time_stamp'], inplace=True)
  111. now_df.set_index(keys=['time_stamp', 'wind_turbine_number'], inplace=True)
  112. df = pd.concat([df, now_df], axis=1)
  113. df.reset_index(inplace=True)
  114. self.df_save_to_tmp_file(df)
  115. return df
  116. def read_file_and_save_tmp(self):
  117. all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
  118. split_count = use_files_get_max_cpu_count(all_files)
  119. all_arrays = split_array(all_files, split_count)
  120. if self.trans_param.merge_columns:
  121. for index, arr in enumerate(all_arrays):
  122. try:
  123. with multiprocessing.Pool(split_count) as pool:
  124. pool.starmap(self.save_merge_data, [(ar,) for ar in arr])
  125. except Exception as e:
  126. trans_print(traceback.format_exc())
  127. message = "整理临时文件,系统返回错误:" + str(e)
  128. raise ValueError(message)
  129. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
  130. round(20 + 20 * (index + 1) / len(all_arrays), 2),
  131. self.pathsAndTable.save_db)
  132. dirs = [os.path.join(self.pathsAndTable.get_merge_tmp_path(), dir_name) for dir_name in
  133. os.listdir(self.pathsAndTable.get_merge_tmp_path())]
  134. dir_total_size = get_dir_size(dirs[0])
  135. split_count = max_file_size_get_max_cpu_count(dir_total_size)
  136. all_arrays = split_array(dirs, split_count)
  137. for index, arr in enumerate(all_arrays):
  138. try:
  139. with multiprocessing.Pool(split_count) as pool:
  140. pool.starmap(self.merge_df, [(ar,) for ar in arr])
  141. except Exception as e:
  142. trans_print(traceback.format_exc())
  143. message = "整理临时文件,系统返回错误:" + str(e)
  144. raise ValueError(message)
  145. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
  146. round(20 + 30 * (index + 1) / len(all_arrays), 2),
  147. self.pathsAndTable.save_db)
  148. else:
  149. for index, arr in enumerate(all_arrays):
  150. try:
  151. with multiprocessing.Pool(split_count) as pool:
  152. dfs = pool.starmap(self.read_excel_to_df, [(ar,) for ar in arr])
  153. for df in dfs:
  154. self.df_save_to_tmp_file(df)
  155. except Exception as e:
  156. trans_print(traceback.format_exc())
  157. message = "整理临时文件,系统返回错误:" + str(e)
  158. raise ValueError(message)
  159. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type,
  160. round(20 + 30 * (index + 1) / len(all_arrays), 2),
  161. self.pathsAndTable.save_db)
  162. def read_excel_to_df(self, file_path):
  163. read_cols = [v.split(",")[0] for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")]
  164. trans_dict = {}
  165. for k, v in self.trans_param.cols_tran.items():
  166. if v and not str(v).startswith("$"):
  167. trans_dict[v] = k
  168. if self.trans_param.is_vertical_table:
  169. vertical_cols = self.trans_param.vertical_cols
  170. df = read_file_to_df(file_path, vertical_cols, header=self.trans_param.header,
  171. trans_cols=self.trans_param.vertical_cols)
  172. df = df[df[self.trans_param.vertical_key].isin(read_cols)]
  173. df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number',
  174. self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True)
  175. df[self.trans_param.vertical_key] = df[self.trans_param.vertical_key].map(trans_dict).fillna(
  176. df[self.trans_param.vertical_key])
  177. return df
  178. else:
  179. trans_dict = dict()
  180. trans_cols = []
  181. for k, v in self.trans_param.cols_tran.items():
  182. if v and v.startswith("$") or v.find(",") > 0:
  183. trans_dict[v] = k
  184. if v.find("|") > -1:
  185. vs = v.split("|")
  186. trans_cols.extend(vs)
  187. else:
  188. trans_cols.append(v)
  189. trans_cols = list(set(trans_cols))
  190. if self.trans_param.merge_columns:
  191. df = read_file_to_df(file_path, header=self.trans_param.header,
  192. trans_cols=trans_cols)
  193. else:
  194. if self.trans_param.need_valid_cols:
  195. df = read_file_to_df(file_path, read_cols, header=self.trans_param.header,
  196. trans_cols=trans_cols)
  197. else:
  198. df = read_file_to_df(file_path, header=self.trans_param.header,
  199. trans_cols=trans_cols)
  200. # 处理列名前缀问题
  201. if self.trans_param.resolve_col_prefix:
  202. columns_dict = dict()
  203. for column in df.columns:
  204. columns_dict[column] = eval(self.trans_param.resolve_col_prefix)
  205. df.rename(columns=columns_dict, inplace=True)
  206. if self.trans_param.merge_columns:
  207. select_cols = [self.trans_param.cols_tran['wind_turbine_number'],
  208. self.trans_param.cols_tran['time_stamp'],
  209. 'wind_turbine_number', 'time_stamp']
  210. select_cols.extend(trans_cols)
  211. rename_dict = dict()
  212. wind_turbine_number_col = self.trans_param.cols_tran['wind_turbine_number']
  213. if wind_turbine_number_col.find("|") > -1:
  214. cols = wind_turbine_number_col.split("|")
  215. for col in cols:
  216. rename_dict[col] = 'wind_turbine_number'
  217. time_stamp_col = self.trans_param.cols_tran['time_stamp']
  218. if time_stamp_col.find("|") > -1:
  219. cols = time_stamp_col.split("|")
  220. for col in cols:
  221. rename_dict[col] = 'time_stamp'
  222. df.rename(columns=rename_dict, inplace=True)
  223. for col in df.columns:
  224. if col not in select_cols:
  225. del df[col]
  226. for k, v in trans_dict.items():
  227. if k.startswith("$file"):
  228. file = ".".join(os.path.basename(file_path).split(".")[0:-1])
  229. if k == "$file":
  230. ks = k.split("|")
  231. bool_contains = False
  232. for k_data in ks:
  233. if k_data in df.columns or v in df.columns:
  234. bool_contains = True
  235. if not bool_contains:
  236. df[v] = str(file)
  237. elif k.startswith("$file["):
  238. ks = k.split("|")
  239. bool_contains = False
  240. for k_data in ks:
  241. if k_data in df.columns or v in df.columns:
  242. bool_contains = True
  243. if not bool_contains:
  244. datas = str(ks[0].replace("$file", "").replace("[", "").replace("]", "")).split(":")
  245. if len(datas) != 2:
  246. raise Exception("字段映射出现错误 :" + str(trans_dict))
  247. df[v] = str(file[int(datas[0]):int(datas[1])]).strip()
  248. elif k.startswith("$file.split"):
  249. ks = k.split("|")
  250. bool_contains = False
  251. for k_data in ks:
  252. if k_data in df.columns or v in df.columns:
  253. bool_contains = True
  254. if not bool_contains:
  255. datas = str(ks[0]).replace("$file.split(", "").replace(")", "").split(",")
  256. split_str = str(datas[0])
  257. split_index = int(datas[1])
  258. df[v] = str(file.split(split_str)[split_index])
  259. elif k.find("$file_date") > 0:
  260. datas = str(k.split(",")[1].replace("$file_date", "").replace("[", "").replace("]", "")).split(":")
  261. if len(datas) != 2:
  262. raise Exception("字段映射出现错误 :" + str(trans_dict))
  263. file = ".".join(os.path.basename(file_path).split(".")[0:-1])
  264. date_str = str(file[int(datas[0]):int(datas[1])]).strip()
  265. df[v] = df[k.split(",")[0]].apply(lambda x: date_str + " " + str(x))
  266. elif k.startswith("$folder"):
  267. folder = file_path
  268. ks = k.split("|")
  269. bool_contains = False
  270. for k_data in ks:
  271. if k_data in df.columns or v in df.columns:
  272. bool_contains = True
  273. if not bool_contains:
  274. cengshu = int(str(ks[0].replace("$folder", "").replace("[", "").replace("]", "")))
  275. for i in range(cengshu):
  276. folder = os.path.dirname(folder)
  277. df[v] = str(str(folder).split(os.sep)[-1]).strip()
  278. elif k.startswith("$sheet_name"):
  279. df[v] = df['sheet_name']
  280. if 'time_stamp' not in df.columns:
  281. cols_trans = [i for i in self.trans_param.cols_tran['time_stamp'].split('|')]
  282. cols_dict = dict()
  283. for col in cols_trans:
  284. cols_dict[col] = 'time_stamp'
  285. df.rename(columns=cols_dict, inplace=True)
  286. if 'wind_turbine_number' not in df.columns:
  287. cols_trans = [i for i in self.trans_param.cols_tran['wind_turbine_number'].split('|')]
  288. cols_dict = dict()
  289. for col in cols_trans:
  290. cols_dict[col] = 'wind_turbine_number'
  291. df.rename(columns=cols_dict, inplace=True)
  292. return df
  293. def run(self):
  294. trans_print("开始保存数据到临时文件")
  295. begin = datetime.datetime.now()
  296. self.read_file_and_save_tmp()
  297. update_trans_transfer_progress(self.pathsAndTable.batch_no, self.pathsAndTable.read_type, 50,
  298. self.pathsAndTable.save_db)
  299. trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin)