ReadAndSaveTmp.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. import datetime
  2. import multiprocessing
  3. import os
  4. import traceback
  5. import pandas as pd
  6. from conf.constants import ParallelProcessing
  7. from etl.common.PathsAndTable import PathsAndTable
  8. from etl.wind_power.min_sec import TransParam
  9. from service.trans_conf_service import update_trans_transfer_progress
  10. from utils.file.trans_methods import read_excel_files, split_array, del_blank, \
  11. create_file_path, read_file_to_df, valid_eval
  12. from utils.log.trans_log import info, debug, error
  13. from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, get_dir_size
  14. class ReadAndSaveTmp(object):
  15. """读取并保存临时文件类"""
  16. def __init__(self, pathsAndTable: PathsAndTable, trans_param: TransParam):
  17. """
  18. 初始化读取并保存临时文件类
  19. Args:
  20. pathsAndTable: 路径和表对象
  21. trans_param: 转换参数对象
  22. """
  23. self.pathsAndTable = pathsAndTable
  24. self.trans_param = trans_param
  25. self.exist_wind_names = multiprocessing.Manager().list()
  26. self.lock = multiprocessing.Manager().Lock()
  27. self.file_lock = multiprocessing.Manager().dict()
  28. def _save_to_tmp_csv_by_name(self, df: pd.DataFrame, name: str):
  29. """
  30. 根据风机名称保存到临时CSV文件
  31. Args:
  32. df: 数据帧
  33. name: 风机名称
  34. """
  35. save_name = str(name) + '.csv'
  36. save_path = os.path.join(self.pathsAndTable.get_read_tmp_path(), save_name)
  37. create_file_path(save_path, is_file_path=True)
  38. with self.lock:
  39. if name in self.exist_wind_names:
  40. contains_name = True
  41. else:
  42. contains_name = False
  43. self.exist_wind_names.append(name)
  44. if contains_name:
  45. df.to_csv(save_path, index=False, encoding='utf8', mode='a',
  46. header=False)
  47. else:
  48. df.to_csv(save_path, index=False, encoding='utf8')
  49. def save_merge_data(self, file_path: str):
  50. """
  51. 保存合并数据
  52. Args:
  53. file_path: 文件路径
  54. """
  55. df = self.read_excel_to_df(file_path)
  56. if self.trans_param.wind_name_exec:
  57. if valid_eval(self.trans_param.wind_name_exec):
  58. exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
  59. df['wind_turbine_number'] = eval(exec_str)
  60. df = self.trans_df_cols(df)
  61. wind_names = set(df['wind_turbine_number'].values)
  62. cols = list(df.columns)
  63. cols.sort()
  64. for wind_name in wind_names:
  65. for col in df.columns:
  66. if col not in ['wind_turbine_number', 'time_stamp']:
  67. csv_name = str(col) + ".csv"
  68. exist_name = wind_name + '-' + csv_name
  69. merge_path = self.pathsAndTable.get_merge_tmp_path(wind_name)
  70. create_file_path(merge_path)
  71. with self.lock:
  72. if exist_name in self.exist_wind_names:
  73. contains_name = True
  74. else:
  75. contains_name = False
  76. self.exist_wind_names.append(exist_name)
  77. save_path = os.path.join(merge_path, csv_name)
  78. now_df = df[df['wind_turbine_number'] == wind_name][['time_stamp', col]]
  79. if contains_name:
  80. now_df.to_csv(save_path, index=False, encoding='utf-8', mode='a',
  81. header=False)
  82. else:
  83. now_df.to_csv(save_path, index=False, encoding='utf-8')
  84. def trans_df_cols(self, df: pd.DataFrame) -> pd.DataFrame:
  85. """
  86. 转换数据帧列名
  87. Args:
  88. df: 数据帧
  89. Returns:
  90. 转换后的数据帧
  91. """
  92. if self.trans_param.is_vertical_table:
  93. pass
  94. else:
  95. # 转换字段
  96. same_col = {}
  97. if self.trans_param.cols_tran:
  98. cols_tran = self.trans_param.cols_tran
  99. real_cols_trans = dict()
  100. for k, v in cols_tran.items():
  101. if v and not v.startswith("$"):
  102. if v not in real_cols_trans.keys():
  103. real_cols_trans[v] = k
  104. else:
  105. value = real_cols_trans[v]
  106. if value in same_col.keys():
  107. same_col[value].append(k)
  108. else:
  109. same_col[value] = [k]
  110. if v and v.find('|') > -1:
  111. vs = v.split('|')
  112. for s in vs:
  113. if s not in real_cols_trans.keys():
  114. real_cols_trans[s] = k
  115. else:
  116. value = real_cols_trans[s]
  117. if value in same_col.keys():
  118. same_col[value].append(k)
  119. else:
  120. same_col[value] = [k]
  121. df.rename(columns=real_cols_trans, inplace=True)
  122. # 添加使用同一个excel字段的值
  123. for key in same_col.keys():
  124. for col in same_col[key]:
  125. df[col] = df[key]
  126. del_keys = set(df.columns) - set(cols_tran.keys())
  127. for key in del_keys:
  128. df.drop(key, axis=1, inplace=True)
  129. return df
  130. def df_save_to_tmp_file(self, df: pd.DataFrame = pd.DataFrame()):
  131. """
  132. 保存数据帧到临时文件
  133. Args:
  134. df: 数据帧
  135. """
  136. df = self.trans_df_cols(df)
  137. df = del_blank(df, ['wind_turbine_number'])
  138. df = df[df['time_stamp'].isna() == False]
  139. if self.trans_param.wind_name_exec and not self.trans_param.merge_columns:
  140. if valid_eval(self.trans_param.wind_name_exec):
  141. exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
  142. df['wind_turbine_number'] = eval(exec_str)
  143. self.save_to_tmp_csv(df)
  144. def save_to_tmp_csv(self, df: pd.DataFrame):
  145. """
  146. 保存到临时CSV文件
  147. Args:
  148. df: 数据帧
  149. """
  150. names = set(df['wind_turbine_number'].values)
  151. if names:
  152. debug("开始保存", str(names), "到临时文件", df.shape)
  153. for name in names:
  154. self._save_to_tmp_csv_by_name(df[df['wind_turbine_number'] == name], name)
  155. del df
  156. debug("保存", str(names), "到临时文件成功, 风机数量", len(names))
  157. def merge_df(self, dir_path: str) -> pd.DataFrame:
  158. """
  159. 合并数据帧
  160. Args:
  161. dir_path: 目录路径
  162. Returns:
  163. 合并后的数据帧
  164. """
  165. all_files = read_excel_files(dir_path)
  166. wind_turbine_number = os.path.basename(dir_path)
  167. df = pd.DataFrame()
  168. for file in all_files:
  169. now_df = read_file_to_df(file)
  170. now_df['wind_turbine_number'] = wind_turbine_number
  171. now_df.dropna(subset=['time_stamp'], inplace=True)
  172. now_df.drop_duplicates(subset=['time_stamp'], inplace=True)
  173. now_df.set_index(keys=['time_stamp', 'wind_turbine_number'], inplace=True)
  174. df = pd.concat([df, now_df], axis=1)
  175. df.reset_index(inplace=True)
  176. self.df_save_to_tmp_file(df)
  177. return df
  178. def read_file_and_save_tmp(self):
  179. """
  180. 读取文件并保存到临时文件
  181. """
  182. all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
  183. split_count = use_files_get_max_cpu_count(all_files)
  184. # 限制最大进程数
  185. split_count = min(split_count, ParallelProcessing.MAX_PROCESSES)
  186. all_arrays = split_array(all_files, split_count)
  187. if self.trans_param.merge_columns:
  188. for index, arr in enumerate(all_arrays):
  189. try:
  190. with multiprocessing.Pool(split_count) as pool:
  191. pool.starmap(self.save_merge_data, [(ar,) for ar in arr])
  192. except Exception as e:
  193. error(traceback.format_exc())
  194. message = "整理临时文件,系统返回错误:" + str(e)
  195. raise ValueError(message)
  196. update_trans_transfer_progress(self.pathsAndTable.id,
  197. round(20 + 20 * (index + 1) / len(all_arrays), 2),
  198. self.pathsAndTable.save_db)
  199. dirs = [os.path.join(self.pathsAndTable.get_merge_tmp_path(), dir_name) for dir_name in
  200. os.listdir(self.pathsAndTable.get_merge_tmp_path())]
  201. if dirs:
  202. dir_total_size = get_dir_size(dirs[0])
  203. # 限制最大进程数
  204. split_count = min(dir_total_size, ParallelProcessing.MAX_PROCESSES)
  205. all_arrays = split_array(dirs, split_count)
  206. for index, arr in enumerate(all_arrays):
  207. try:
  208. with multiprocessing.Pool(split_count) as pool:
  209. pool.starmap(self.merge_df, [(ar,) for ar in arr])
  210. except Exception as e:
  211. error(traceback.format_exc())
  212. message = "整理临时文件,系统返回错误:" + str(e)
  213. raise ValueError(message)
  214. update_trans_transfer_progress(self.pathsAndTable.id,
  215. round(20 + 30 * (index + 1) / len(all_arrays), 2),
  216. self.pathsAndTable.save_db)
  217. else:
  218. for index, arr in enumerate(all_arrays):
  219. try:
  220. with multiprocessing.Pool(split_count) as pool:
  221. dfs = pool.starmap(self.read_excel_to_df, [(ar,) for ar in arr])
  222. for df in dfs:
  223. self.df_save_to_tmp_file(df)
  224. except Exception as e:
  225. error(traceback.format_exc())
  226. message = "整理临时文件,系统返回错误:" + str(e)
  227. raise ValueError(message)
  228. update_trans_transfer_progress(self.pathsAndTable.id,
  229. round(20 + 30 * (index + 1) / len(all_arrays), 2),
  230. self.pathsAndTable.save_db)
  231. def read_excel_to_df(self, file_path: str) -> pd.DataFrame:
  232. """
  233. 读取Excel文件到数据帧
  234. Args:
  235. file_path: 文件路径
  236. Returns:
  237. 数据帧
  238. """
  239. read_cols = [v.split(",")[0] for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")]
  240. trans_dict = {}
  241. for k, v in self.trans_param.cols_tran.items():
  242. if v and not str(v).startswith("$"):
  243. trans_dict[v] = k
  244. if self.trans_param.is_vertical_table:
  245. vertical_cols = self.trans_param.vertical_cols
  246. df = read_file_to_df(file_path, vertical_cols, trans_cols=self.trans_param.vertical_cols)
  247. df = df[df[self.trans_param.vertical_key].isin(read_cols)]
  248. df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number',
  249. self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True)
  250. df[self.trans_param.vertical_key] = df[self.trans_param.vertical_key].map(trans_dict).fillna(
  251. df[self.trans_param.vertical_key])
  252. return df
  253. else:
  254. trans_dict = dict()
  255. trans_cols = []
  256. for k, v in self.trans_param.cols_tran.items():
  257. if v and v.startswith("$") or v.find(",") > 0:
  258. trans_dict[v] = k
  259. if v.find("|") > -1:
  260. vs = v.split("|")
  261. trans_cols.extend(vs)
  262. else:
  263. trans_cols.append(v)
  264. trans_cols = list(set(trans_cols))
  265. if self.trans_param.merge_columns:
  266. df = read_file_to_df(file_path, trans_cols=trans_cols, not_find_header='ignore',
  267. resolve_col_prefix=self.trans_param.resolve_col_prefix)
  268. else:
  269. if self.trans_param.need_valid_cols:
  270. if self.trans_param.resolve_col_prefix:
  271. df = read_file_to_df(file_path, trans_cols=trans_cols,
  272. resolve_col_prefix=self.trans_param.resolve_col_prefix)
  273. else:
  274. df = read_file_to_df(file_path, read_cols, trans_cols=trans_cols,
  275. resolve_col_prefix=self.trans_param.resolve_col_prefix)
  276. else:
  277. df = read_file_to_df(file_path, trans_cols=trans_cols,
  278. resolve_col_prefix=self.trans_param.resolve_col_prefix)
  279. # 处理列名前缀问题
  280. if self.trans_param.resolve_col_prefix:
  281. columns_dict = dict()
  282. if valid_eval(self.trans_param.resolve_col_prefix):
  283. for column in df.columns:
  284. columns_dict[column] = eval(self.trans_param.resolve_col_prefix)
  285. df.rename(columns=columns_dict, inplace=True)
  286. if self.trans_param.merge_columns:
  287. select_cols = [self.trans_param.cols_tran['wind_turbine_number'],
  288. self.trans_param.cols_tran['time_stamp'],
  289. 'wind_turbine_number', 'time_stamp']
  290. select_cols.extend(trans_cols)
  291. rename_dict = dict()
  292. wind_turbine_number_col = self.trans_param.cols_tran['wind_turbine_number']
  293. if wind_turbine_number_col.find("|") > -1:
  294. cols = wind_turbine_number_col.split("|")
  295. for col in cols:
  296. rename_dict[col] = 'wind_turbine_number'
  297. time_stamp_col = self.trans_param.cols_tran['time_stamp']
  298. if time_stamp_col.find("|") > -1:
  299. cols = time_stamp_col.split("|")
  300. for col in cols:
  301. rename_dict[col] = 'time_stamp'
  302. df.rename(columns=rename_dict, inplace=True)
  303. for col in df.columns:
  304. if col not in select_cols:
  305. del df[col]
  306. for k, v in trans_dict.items():
  307. if k.startswith("$file"):
  308. file = ".".join(os.path.basename(file_path).split(".")[0:-1])
  309. if k == "$file":
  310. ks = k.split("|")
  311. bool_contains = False
  312. for k_data in ks:
  313. if k_data in df.columns or v in df.columns:
  314. bool_contains = True
  315. if not bool_contains:
  316. df[v] = str(file)
  317. elif k.startswith("$file["):
  318. ks = k.split("|")
  319. bool_contains = False
  320. for k_data in ks:
  321. if k_data in df.columns or v in df.columns:
  322. bool_contains = True
  323. if not bool_contains:
  324. datas = str(ks[0].replace("$file", "").replace("[", "").replace("]", "")).split(":")
  325. if len(datas) != 2:
  326. raise Exception("字段映射出现错误 :" + str(trans_dict))
  327. df[v] = str(file[int(datas[0]):int(datas[1])]).strip()
  328. elif k.startswith("$file.split"):
  329. ks = k.split("|")
  330. bool_contains = False
  331. for k_data in ks:
  332. if k_data in df.columns or v in df.columns:
  333. bool_contains = True
  334. if not bool_contains:
  335. datas = str(ks[0]).replace("$file.split(", "").replace(")", "").split(",")
  336. split_str = str(datas[0])
  337. split_index = int(datas[1])
  338. df[v] = str(file.split(split_str)[split_index])
  339. elif k.find("$file_date") > 0:
  340. datas = str(k.split(",")[1].replace("$file_date", "").replace("[", "").replace("]", "")).split(":")
  341. if len(datas) != 2:
  342. raise Exception("字段映射出现错误 :" + str(trans_dict))
  343. file = ".".join(os.path.basename(file_path).split(".")[0:-1])
  344. date_str = str(file[int(datas[0]):int(datas[1])]).strip()
  345. df[v] = df[k.split(",")[0]].apply(lambda x: date_str + " " + str(x))
  346. elif k.startswith("$folder"):
  347. folder = file_path
  348. ks = k.split("|")
  349. bool_contains = False
  350. for k_data in ks:
  351. if k_data in df.columns or v in df.columns:
  352. bool_contains = True
  353. if not bool_contains:
  354. cengshu = int(str(ks[0].replace("$folder", "").replace("[", "").replace("]", "")))
  355. for i in range(cengshu):
  356. folder = os.path.dirname(folder)
  357. df[v] = str(str(folder).split(os.sep)[-1]).strip()
  358. elif k.startswith("$sheet_name"):
  359. df[v] = df['sheet_name']
  360. if 'time_stamp' not in df.columns:
  361. cols_trans = [i for i in self.trans_param.cols_tran['time_stamp'].split('|')]
  362. cols_dict = dict()
  363. for col in cols_trans:
  364. cols_dict[col] = 'time_stamp'
  365. df.rename(columns=cols_dict, inplace=True)
  366. if 'wind_turbine_number' not in df.columns:
  367. cols_trans = [i for i in self.trans_param.cols_tran['wind_turbine_number'].split('|')]
  368. cols_dict = dict()
  369. for col in cols_trans:
  370. cols_dict[col] = 'wind_turbine_number'
  371. df.rename(columns=cols_dict, inplace=True)
  372. return df
  373. def run(self):
  374. """
  375. """
  376. info("开始保存数据到临时文件")
  377. begin = datetime.datetime.now()
  378. self.read_file_and_save_tmp()
  379. update_trans_transfer_progress(self.pathsAndTable.id, 50,
  380. self.pathsAndTable.save_db)
  381. info("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin)