ReadAndSaveTmp.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. import datetime
  2. import multiprocessing
  3. import traceback
  4. from os import *
  5. import pandas as pd
  6. from etl.common.PathsAndTable import PathsAndTable
  7. from etl.wind_power.min_sec import TransParam
  8. from service.trans_conf_service import update_trans_transfer_progress
  9. from utils.file.trans_methods import read_excel_files, split_array, del_blank, \
  10. create_file_path, read_file_to_df, valid_eval
  11. from utils.log.trans_log import trans_print
  12. from utils.systeminfo.sysinfo import use_files_get_max_cpu_count, get_dir_size
  13. class ReadAndSaveTmp(object):
  14. def __init__(self, pathsAndTable: PathsAndTable, trans_param: TransParam):
  15. self.pathsAndTable = pathsAndTable
  16. self.trans_param = trans_param
  17. self.exist_wind_names = multiprocessing.Manager().list()
  18. self.lock = multiprocessing.Manager().Lock()
  19. self.file_lock = multiprocessing.Manager().dict()
  20. def _save_to_tmp_csv_by_name(self, df, name):
  21. save_name = str(name) + '.csv'
  22. save_path = path.join(self.pathsAndTable.get_read_tmp_path(), save_name)
  23. create_file_path(save_path, is_file_path=True)
  24. with self.lock:
  25. if name in self.exist_wind_names:
  26. contains_name = True
  27. else:
  28. contains_name = False
  29. self.exist_wind_names.append(name)
  30. if contains_name:
  31. df.to_csv(save_path, index=False, encoding='utf8', mode='a',
  32. header=False)
  33. else:
  34. df.to_csv(save_path, index=False, encoding='utf8')
  35. def save_merge_data(self, file_path):
  36. df = self.read_excel_to_df(file_path)
  37. if self.trans_param.wind_name_exec:
  38. if valid_eval(self.trans_param.wind_name_exec):
  39. exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
  40. df['wind_turbine_number'] = eval(exec_str)
  41. df = self.trans_df_cols(df)
  42. wind_names = set(df['wind_turbine_number'].values)
  43. cols = list(df.columns)
  44. cols.sort()
  45. for wind_name in wind_names:
  46. for col in df.columns:
  47. if col not in ['wind_turbine_number', 'time_stamp']:
  48. csv_name = str(col) + ".csv"
  49. exist_name = wind_name + '-' + csv_name
  50. merge_path = self.pathsAndTable.get_merge_tmp_path(wind_name)
  51. create_file_path(merge_path)
  52. with self.lock:
  53. if exist_name in self.exist_wind_names:
  54. contains_name = True
  55. else:
  56. contains_name = False
  57. self.exist_wind_names.append(exist_name)
  58. save_path = path.join(merge_path, csv_name)
  59. now_df = df[df['wind_turbine_number'] == wind_name][['time_stamp', col]]
  60. if contains_name:
  61. now_df.to_csv(save_path, index=False, encoding='utf-8', mode='a',
  62. header=False)
  63. else:
  64. now_df.to_csv(save_path, index=False, encoding='utf-8')
  65. def trans_df_cols(self, df):
  66. if self.trans_param.is_vertical_table:
  67. pass
  68. else:
  69. # 转换字段
  70. same_col = {}
  71. if self.trans_param.cols_tran:
  72. cols_tran = self.trans_param.cols_tran
  73. real_cols_trans = dict()
  74. for k, v in cols_tran.items():
  75. if v and not v.startswith("$"):
  76. if v not in real_cols_trans.keys():
  77. real_cols_trans[v] = k
  78. else:
  79. value = real_cols_trans[v]
  80. if value in same_col.keys():
  81. same_col[value].append(k)
  82. else:
  83. same_col[value] = [k]
  84. if v and v.find('|') > -1:
  85. vs = v.split('|')
  86. for s in vs:
  87. if s not in real_cols_trans.keys():
  88. real_cols_trans[s] = k
  89. else:
  90. value = real_cols_trans[s]
  91. if value in same_col.keys():
  92. same_col[value].append(k)
  93. else:
  94. same_col[value] = [k]
  95. df.rename(columns=real_cols_trans, inplace=True)
  96. # 添加使用同一个excel字段的值
  97. for key in same_col.keys():
  98. for col in same_col[key]:
  99. df[col] = df[key]
  100. del_keys = set(df.columns) - set(cols_tran.keys())
  101. for key in del_keys:
  102. df.drop(key, axis=1, inplace=True)
  103. return df
  104. def df_save_to_tmp_file(self, df=pd.DataFrame()):
  105. df = self.trans_df_cols(df)
  106. df = del_blank(df, ['wind_turbine_number'])
  107. df = df[df['time_stamp'].isna() == False]
  108. if self.trans_param.wind_name_exec and not self.trans_param.merge_columns:
  109. if valid_eval(self.trans_param.wind_name_exec):
  110. exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
  111. df['wind_turbine_number'] = eval(exec_str)
  112. self.save_to_tmp_csv(df)
  113. def save_to_tmp_csv(self, df):
  114. names = set(df['wind_turbine_number'].values)
  115. if names:
  116. trans_print("开始保存", str(names), "到临时文件", df.shape)
  117. for name in names:
  118. self._save_to_tmp_csv_by_name(df[df['wind_turbine_number'] == name], name)
  119. del df
  120. trans_print("保存", str(names), "到临时文件成功, 风机数量", len(names))
  121. def merge_df(self, dir_path):
  122. all_files = read_excel_files(dir_path)
  123. wind_turbine_number = path.basename(dir_path)
  124. df = pd.DataFrame()
  125. for file in all_files:
  126. now_df = read_file_to_df(file)
  127. now_df['wind_turbine_number'] = wind_turbine_number
  128. now_df.dropna(subset=['time_stamp'], inplace=True)
  129. now_df.drop_duplicates(subset=['time_stamp'], inplace=True)
  130. now_df.set_index(keys=['time_stamp', 'wind_turbine_number'], inplace=True)
  131. df = pd.concat([df, now_df], axis=1)
  132. df.reset_index(inplace=True)
  133. self.df_save_to_tmp_file(df)
  134. return df
  135. def read_file_and_save_tmp(self):
  136. all_files = read_excel_files(self.pathsAndTable.get_excel_tmp_path())
  137. split_count = use_files_get_max_cpu_count(all_files)
  138. all_arrays = split_array(all_files, split_count)
  139. if self.trans_param.merge_columns:
  140. for index, arr in enumerate(all_arrays):
  141. try:
  142. with multiprocessing.Pool(split_count) as pool:
  143. pool.starmap(self.save_merge_data, [(ar,) for ar in arr])
  144. except Exception as e:
  145. trans_print(traceback.format_exc())
  146. message = "整理临时文件,系统返回错误:" + str(e)
  147. raise ValueError(message)
  148. update_trans_transfer_progress(self.pathsAndTable.id,
  149. round(20 + 20 * (index + 1) / len(all_arrays), 2),
  150. self.pathsAndTable.save_db)
  151. dirs = [path.join(self.pathsAndTable.get_merge_tmp_path(), dir_name) for dir_name in
  152. listdir(self.pathsAndTable.get_merge_tmp_path())]
  153. dir_total_size = get_dir_size(dirs[0])
  154. # split_count = max_file_size_get_max_cpu_count(dir_total_size, memory_percent=1 / 12, cpu_percent=1 / 10)
  155. split_count = 2
  156. all_arrays = split_array(dirs, split_count)
  157. for index, arr in enumerate(all_arrays):
  158. try:
  159. with multiprocessing.Pool(split_count) as pool:
  160. pool.starmap(self.merge_df, [(ar,) for ar in arr])
  161. except Exception as e:
  162. trans_print(traceback.format_exc())
  163. message = "整理临时文件,系统返回错误:" + str(e)
  164. raise ValueError(message)
  165. update_trans_transfer_progress(self.pathsAndTable.id,
  166. round(20 + 30 * (index + 1) / len(all_arrays), 2),
  167. self.pathsAndTable.save_db)
  168. else:
  169. for index, arr in enumerate(all_arrays):
  170. try:
  171. with multiprocessing.Pool(split_count) as pool:
  172. dfs = pool.starmap(self.read_excel_to_df, [(ar,) for ar in arr])
  173. for df in dfs:
  174. self.df_save_to_tmp_file(df)
  175. except Exception as e:
  176. trans_print(traceback.format_exc())
  177. message = "整理临时文件,系统返回错误:" + str(e)
  178. raise ValueError(message)
  179. update_trans_transfer_progress(self.pathsAndTable.id,
  180. round(20 + 30 * (index + 1) / len(all_arrays), 2),
  181. self.pathsAndTable.save_db)
  182. def read_excel_to_df(self, file_path):
  183. read_cols = [v.split(",")[0] for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")]
  184. trans_dict = {}
  185. for k, v in self.trans_param.cols_tran.items():
  186. if v and not str(v).startswith("$"):
  187. trans_dict[v] = k
  188. if self.trans_param.is_vertical_table:
  189. vertical_cols = self.trans_param.vertical_cols
  190. df = read_file_to_df(file_path, vertical_cols, trans_cols=self.trans_param.vertical_cols)
  191. df = df[df[self.trans_param.vertical_key].isin(read_cols)]
  192. df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number',
  193. self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True)
  194. df[self.trans_param.vertical_key] = df[self.trans_param.vertical_key].map(trans_dict).fillna(
  195. df[self.trans_param.vertical_key])
  196. return df
  197. else:
  198. trans_dict = dict()
  199. trans_cols = []
  200. for k, v in self.trans_param.cols_tran.items():
  201. if v and v.startswith("$") or v.find(",") > 0:
  202. trans_dict[v] = k
  203. if v.find("|") > -1:
  204. vs = v.split("|")
  205. trans_cols.extend(vs)
  206. else:
  207. trans_cols.append(v)
  208. trans_cols = list(set(trans_cols))
  209. if self.trans_param.merge_columns:
  210. df = read_file_to_df(file_path, trans_cols=trans_cols, not_find_header='ignore',
  211. resolve_col_prefix=self.trans_param.resolve_col_prefix)
  212. else:
  213. if self.trans_param.need_valid_cols:
  214. if self.trans_param.resolve_col_prefix:
  215. df = read_file_to_df(file_path, trans_cols=trans_cols,
  216. resolve_col_prefix=self.trans_param.resolve_col_prefix)
  217. else:
  218. df = read_file_to_df(file_path, read_cols, trans_cols=trans_cols,
  219. resolve_col_prefix=self.trans_param.resolve_col_prefix)
  220. else:
  221. df = read_file_to_df(file_path, trans_cols=trans_cols,
  222. resolve_col_prefix=self.trans_param.resolve_col_prefix)
  223. # 处理列名前缀问题
  224. if self.trans_param.resolve_col_prefix:
  225. columns_dict = dict()
  226. if valid_eval(self.trans_param.resolve_col_prefix):
  227. for column in df.columns:
  228. columns_dict[column] = eval(self.trans_param.resolve_col_prefix)
  229. df.rename(columns=columns_dict, inplace=True)
  230. if self.trans_param.merge_columns:
  231. select_cols = [self.trans_param.cols_tran['wind_turbine_number'],
  232. self.trans_param.cols_tran['time_stamp'],
  233. 'wind_turbine_number', 'time_stamp']
  234. select_cols.extend(trans_cols)
  235. rename_dict = dict()
  236. wind_turbine_number_col = self.trans_param.cols_tran['wind_turbine_number']
  237. if wind_turbine_number_col.find("|") > -1:
  238. cols = wind_turbine_number_col.split("|")
  239. for col in cols:
  240. rename_dict[col] = 'wind_turbine_number'
  241. time_stamp_col = self.trans_param.cols_tran['time_stamp']
  242. if time_stamp_col.find("|") > -1:
  243. cols = time_stamp_col.split("|")
  244. for col in cols:
  245. rename_dict[col] = 'time_stamp'
  246. df.rename(columns=rename_dict, inplace=True)
  247. for col in df.columns:
  248. if col not in select_cols:
  249. del df[col]
  250. for k, v in trans_dict.items():
  251. if k.startswith("$file"):
  252. file = ".".join(path.basename(file_path).split(".")[0:-1])
  253. if k == "$file":
  254. ks = k.split("|")
  255. bool_contains = False
  256. for k_data in ks:
  257. if k_data in df.columns or v in df.columns:
  258. bool_contains = True
  259. if not bool_contains:
  260. df[v] = str(file)
  261. elif k.startswith("$file["):
  262. ks = k.split("|")
  263. bool_contains = False
  264. for k_data in ks:
  265. if k_data in df.columns or v in df.columns:
  266. bool_contains = True
  267. if not bool_contains:
  268. datas = str(ks[0].replace("$file", "").replace("[", "").replace("]", "")).split(":")
  269. if len(datas) != 2:
  270. raise Exception("字段映射出现错误 :" + str(trans_dict))
  271. df[v] = str(file[int(datas[0]):int(datas[1])]).strip()
  272. elif k.startswith("$file.split"):
  273. ks = k.split("|")
  274. bool_contains = False
  275. for k_data in ks:
  276. if k_data in df.columns or v in df.columns:
  277. bool_contains = True
  278. if not bool_contains:
  279. datas = str(ks[0]).replace("$file.split(", "").replace(")", "").split(",")
  280. split_str = str(datas[0])
  281. split_index = int(datas[1])
  282. df[v] = str(file.split(split_str)[split_index])
  283. elif k.find("$file_date") > 0:
  284. datas = str(k.split(",")[1].replace("$file_date", "").replace("[", "").replace("]", "")).split(":")
  285. if len(datas) != 2:
  286. raise Exception("字段映射出现错误 :" + str(trans_dict))
  287. file = ".".join(path.basename(file_path).split(".")[0:-1])
  288. date_str = str(file[int(datas[0]):int(datas[1])]).strip()
  289. df[v] = df[k.split(",")[0]].apply(lambda x: date_str + " " + str(x))
  290. elif k.startswith("$folder"):
  291. folder = file_path
  292. ks = k.split("|")
  293. bool_contains = False
  294. for k_data in ks:
  295. if k_data in df.columns or v in df.columns:
  296. bool_contains = True
  297. if not bool_contains:
  298. cengshu = int(str(ks[0].replace("$folder", "").replace("[", "").replace("]", "")))
  299. for i in range(cengshu):
  300. folder = path.dirname(folder)
  301. df[v] = str(str(folder).split(sep)[-1]).strip()
  302. elif k.startswith("$sheet_name"):
  303. df[v] = df['sheet_name']
  304. if 'time_stamp' not in df.columns:
  305. cols_trans = [i for i in self.trans_param.cols_tran['time_stamp'].split('|')]
  306. cols_dict = dict()
  307. for col in cols_trans:
  308. cols_dict[col] = 'time_stamp'
  309. df.rename(columns=cols_dict, inplace=True)
  310. if 'wind_turbine_number' not in df.columns:
  311. cols_trans = [i for i in self.trans_param.cols_tran['wind_turbine_number'].split('|')]
  312. cols_dict = dict()
  313. for col in cols_trans:
  314. cols_dict[col] = 'wind_turbine_number'
  315. df.rename(columns=cols_dict, inplace=True)
  316. return df
  317. def run(self):
  318. trans_print("开始保存数据到临时文件")
  319. begin = datetime.datetime.now()
  320. self.read_file_and_save_tmp()
  321. update_trans_transfer_progress(self.pathsAndTable.id, 50,
  322. self.pathsAndTable.save_db)
  323. trans_print("保存数据到临时文件结束,耗时:", datetime.datetime.now() - begin)