WindFarms.py 16 KB


  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/15
  3. # @Author : 魏志亮
  4. import copy
  5. import datetime
  6. import multiprocessing
  7. import sys
  8. import tempfile
  9. from base.TranseParam import TranseParam
  10. from utils.db.trans_mysql import creat_table_and_add_partition, update_trans_status, get_all_wind, \
  11. rename_table, read_excel_and_save_to_db
  12. from utils.log.trans_log import logger
  13. from utils.trans_methods import *
  14. from utils.zip.unzip import unzip, unrar
  15. class WindFarms(object):
  16. def __init__(self, name, batch_no=None, field_code=None, params: TranseParam = None, wind_full_name=None):
  17. self.name = name
  18. self.batch_no = batch_no
  19. self.field_code = field_code
  20. self.wind_full_name = wind_full_name
  21. self.begin = datetime.datetime.now()
  22. self.save_zip = False
  23. self.trans_param = params
  24. self.__exist_wind_names = set()
  25. self.wind_col_trans = get_all_wind(self.field_code)
  26. self.batch_count = 50000
  27. self.save_path = None
  28. def set_trans_param(self, params: TranseParam):
  29. self.trans_param = params
  30. read_path = str(params.read_path)
  31. if read_path.find(self.wind_full_name) == -1:
  32. message = "读取路径与配置路径不匹配:" + self.trans_param.read_path + ",配置文件为:" + self.wind_full_name
  33. update_trans_status(self.batch_no, self.trans_param.read_type, "error", message)
  34. raise ValueError(message)
  35. self.save_path = os.path.join(read_path[0:read_path.find(self.wind_full_name)], self.wind_full_name, "清理数据")
  36. def __params_valid(self, not_null_list=list()):
  37. for arg in not_null_list:
  38. if arg is None or arg == '':
  39. raise Exception("Invalid param set :" + arg)
  40. def __get_save_path(self):
  41. return os.path.join(self.save_path, self.batch_no, self.trans_param.read_type)
  42. def __get_save_tmp_path(self):
  43. return os.path.join(tempfile.gettempdir(), self.wind_full_name, self.batch_no, self.trans_param.read_type)
  44. def __get_excel_tmp_path(self):
  45. return os.path.join(self.__get_save_tmp_path(), 'excel_tmp' + os.sep)
  46. def __get_read_tmp_path(self):
  47. return os.path.join(self.__get_save_tmp_path(), 'read_tmp')
  48. def __df_save_to_tmp_file(self, df=pd.DataFrame(), file=None):
  49. if self.trans_param.is_vertical_table:
  50. pass
  51. else:
  52. # 转换字段
  53. if self.trans_param.cols_tran:
  54. cols_tran = self.trans_param.cols_tran
  55. real_cols_trans = dict()
  56. for k, v in cols_tran.items():
  57. if v and not v.startswith("$"):
  58. real_cols_trans[v] = k
  59. trans_print("包含转换字段,开始处理转换字段")
  60. df.rename(columns=real_cols_trans, inplace=True)
  61. del_keys = set(df.columns) - set(cols_tran.keys())
  62. for key in del_keys:
  63. df.drop(key, axis=1, inplace=True)
  64. df = del_blank(df, ['wind_turbine_number'])
  65. self.__save_to_tmp_csv(df, file)
  66. def __get_excel_files(self):
  67. if os.path.isfile(self.trans_param.read_path):
  68. all_files = [self.trans_param.read_path]
  69. else:
  70. all_files = read_files(self.trans_param.read_path)
  71. to_path = self.__get_excel_tmp_path()
  72. for file in all_files:
  73. if str(file).endswith("zip"):
  74. if str(file).endswith("csv.zip"):
  75. copy_to_new(file, file.replace(self.trans_param.read_path, to_path).replace("csv.zip", 'csv.gz'))
  76. else:
  77. is_success, e = unzip(file, file.replace(self.trans_param.read_path, to_path).split(".")[0])
  78. self.trans_param.has_zip = True
  79. if not is_success:
  80. raise e
  81. elif str(file).endswith("rar"):
  82. is_success, e = unrar(file, file.replace(self.trans_param.read_path, to_path).split(".")[0])
  83. self.trans_param.has_zip = True
  84. if not is_success:
  85. raise e
  86. else:
  87. copy_to_new(file, file.replace(self.trans_param.read_path, to_path))
  88. return read_excel_files(to_path)
  89. def __read_excel_to_df(self, file):
  90. read_cols = [v for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")]
  91. trans_dict = {}
  92. for k, v in self.trans_param.cols_tran.items():
  93. if v and not str(v).startswith("$"):
  94. trans_dict[v] = k
  95. if self.trans_param.is_vertical_table:
  96. vertical_cols = self.trans_param.vertical_cols
  97. df = read_file_to_df(file, vertical_cols)
  98. df = df[df[self.trans_param.vertical_key].isin(read_cols)]
  99. df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number',
  100. self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True)
  101. df[self.trans_param.vertical_key] = df[self.trans_param.vertical_key].map(trans_dict).fillna(
  102. df[self.trans_param.vertical_key])
  103. return df
  104. else:
  105. trans_dict = dict()
  106. for k, v in self.trans_param.cols_tran.items():
  107. if v and v.startswith("$"):
  108. trans_dict[v] = k
  109. if self.trans_param.merge_columns:
  110. df = read_file_to_df(file)
  111. else:
  112. if self.trans_param.need_valid_cols:
  113. df = read_file_to_df(file, read_cols)
  114. else:
  115. df = read_file_to_df(file)
  116. # 处理列名前缀问题
  117. if self.trans_param.resolve_col_prefix:
  118. columns_dict = dict()
  119. for column in df.columns:
  120. columns_dict[column] = eval(self.trans_param.resolve_col_prefix)
  121. df.rename(columns=columns_dict, inplace=True)
  122. for k, v in trans_dict.items():
  123. if k.startswith("$file"):
  124. file_name = ".".join(os.path.basename(file).split(".")[0:-1])
  125. if k == "$file":
  126. df[v] = str(file_name)
  127. else:
  128. datas = str(k.replace("$file", "").replace("[", "").replace("]", "")).split(":")
  129. if len(datas) != 2:
  130. raise Exception("字段映射出现错误 :" + str(trans_dict))
  131. df[v] = str(file_name[int(datas[0]):int(datas[1])]).strip()
  132. elif k.startswith("$folder"):
  133. folder = file
  134. cengshu = int(str(k.replace("$folder", "").replace("[", "").replace("]", "")))
  135. for i in range(cengshu):
  136. folder = os.path.dirname(folder)
  137. df[v] = str(str(folder).split(os.sep)[-1]).strip()
  138. return df
  139. def __save_to_tmp_csv(self, df, file):
  140. trans_print("开始保存", str(file), "到临时文件成功")
  141. names = set(df['wind_turbine_number'].values)
  142. for name in names:
  143. save_name = str(name) + '.csv'
  144. save_path = os.path.join(self.__get_read_tmp_path(), save_name)
  145. create_file_path(save_path, is_file_path=True)
  146. if name in self.__exist_wind_names:
  147. df[df['wind_turbine_number'] == name].to_csv(save_path, index=False, encoding='utf8', mode='a',
  148. header=False)
  149. else:
  150. self.__exist_wind_names.add(name)
  151. df[df['wind_turbine_number'] == name].to_csv(save_path, index=False, encoding='utf8')
  152. del df
  153. trans_print("保存", str(names), "到临时文件成功, 风机数量", len(names))
  154. def save_to_csv(self, filename):
  155. df = read_file_to_df(filename)
  156. if self.trans_param.is_vertical_table:
  157. df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
  158. values=self.trans_param.vertical_value,
  159. aggfunc='max')
  160. # 重置索引以得到普通的列
  161. df.reset_index(inplace=True)
  162. for k in self.trans_param.cols_tran.keys():
  163. if k not in df.columns:
  164. df[k] = None
  165. df = df[self.trans_param.cols_tran.keys()]
  166. # 添加年月日
  167. trans_print("包含时间字段,开始处理时间字段,添加年月日", filename)
  168. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  169. df['year'] = df['time_stamp'].dt.year
  170. df['month'] = df['time_stamp'].dt.month
  171. df['day'] = df['time_stamp'].dt.day
  172. df.sort_values(by='time_stamp', inplace=True)
  173. df['time_stamp'] = df['time_stamp'].apply(
  174. lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  175. trans_print("处理时间字段结束")
  176. # 转化风机名称
  177. trans_print("开始转化风机名称")
  178. if self.trans_param.wind_name_exec:
  179. exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
  180. df['wind_turbine_number'] = eval(exec_str)
  181. df['wind_turbine_number'] = df['wind_turbine_number'].map(
  182. self.wind_col_trans).fillna(
  183. df['wind_turbine_number'])
  184. trans_print("转化风机名称结束")
  185. wind_col_name = str(df['wind_turbine_number'].values[0])
  186. if self.save_zip:
  187. save_path = os.path.join(self.__get_save_path(), str(wind_col_name) + '.csv.gz')
  188. else:
  189. save_path = os.path.join(self.__get_save_path(), str(wind_col_name) + '.csv')
  190. create_file_path(save_path, is_file_path=True)
  191. if self.save_zip:
  192. df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
  193. else:
  194. df.to_csv(save_path, index=False, encoding='utf-8')
  195. del df
  196. trans_print("保存" + str(filename) + ".csv成功")
  197. def read_all_files(self):
  198. # 读取文件
  199. try:
  200. all_files = self.__get_excel_files()
  201. trans_print('读取文件数量:', len(all_files))
  202. except Exception as e:
  203. logger.exception(e)
  204. message = "读取文件列表错误:" + self.trans_param.read_path + ",系统返回错误:" + str(e)
  205. update_trans_status(self.batch_no, self.trans_param.read_type, "error", message)
  206. raise e
  207. return all_files
  208. def __read_file_and_save_tmp(self):
  209. all_files = self.read_all_files()
  210. if self.trans_param.merge_columns:
  211. # with multiprocessing.Pool(6) as pool:
  212. # dfs = pool.starmap(self.__read_excel_to_df, [(file,) for file in all_files])
  213. dfs = list()
  214. index_keys = [self.trans_param.cols_tran['time_stamp']]
  215. wind_col = self.trans_param.cols_tran['wind_turbine_number']
  216. if str(wind_col).startswith("$"):
  217. wind_col = 'wind_turbine_number'
  218. index_keys.append(wind_col)
  219. df_map = dict()
  220. for file in all_files:
  221. df = self.__read_excel_to_df(file)
  222. key = '-'.join(df.columns)
  223. if key in df_map.keys():
  224. df_map[key] = pd.concat([df_map[key], df])
  225. else:
  226. df_map[key] = df
  227. for k, df in df_map.items():
  228. df.drop_duplicates(inplace=True)
  229. df.set_index(keys=index_keys, inplace=True)
  230. df = df[~df.index.duplicated(keep='first')]
  231. dfs.append(df)
  232. df = pd.concat(dfs, axis=1)
  233. df.reset_index(inplace=True)
  234. names = set(df[wind_col].values)
  235. try:
  236. for name in names:
  237. self.__df_save_to_tmp_file(df[df[wind_col] == name], "")
  238. except Exception as e:
  239. logger.exception(e)
  240. message = "合并列出现错误:" + str(e)
  241. update_trans_status(self.batch_no, self.trans_param.read_type, "error", message)
  242. raise e
  243. else:
  244. for file in all_files:
  245. try:
  246. self.__df_save_to_tmp_file(self.__read_excel_to_df(file), file)
  247. except Exception as e:
  248. logger.exception(e)
  249. message = "读取文件错误:" + file + ",系统返回错误:" + str(e)
  250. update_trans_status(self.batch_no, self.trans_param.read_type, "error", message)
  251. raise e
  252. def mutiprocessing_to_save_file(self):
  253. # 开始保存到正式文件
  254. trans_print("开始保存到excel文件")
  255. all_tmp_files = read_excel_files(self.__get_read_tmp_path())
  256. try:
  257. with multiprocessing.Pool(6) as pool:
  258. pool.starmap(self.save_to_csv, [(file,) for file in all_tmp_files])
  259. except Exception as e:
  260. logger.exception(e)
  261. message = "保存文件错误,系统返回错误:" + str(e)
  262. update_trans_status(self.batch_no, self.trans_param.read_type, "error", message)
  263. raise e
  264. trans_print("结束保存到excel文件")
  265. def mutiprocessing_to_save_db(self):
  266. # 开始保存到SQL文件
  267. trans_print("开始保存到数据库文件")
  268. all_saved_files = read_excel_files(self.__get_save_path())
  269. table_name = self.batch_no + "_" + self.trans_param.read_type
  270. creat_table_and_add_partition(table_name, len(all_saved_files), self.trans_param.read_type)
  271. try:
  272. with multiprocessing.Pool(6) as pool:
  273. pool.starmap(read_excel_and_save_to_db,
  274. [(table_name, file, self.batch_count) for file in all_saved_files])
  275. except Exception as e:
  276. logger.exception(e)
  277. message = "保存到数据库错误,系统返回错误:" + str(e)
  278. update_trans_status(self.batch_no, self.trans_param.read_type, "error", message)
  279. raise e
  280. trans_print("结束保存到数据库文件")
  281. def __rename_file(self):
  282. save_path = self.__get_save_path()
  283. files = os.listdir(save_path)
  284. files.sort(key=lambda x: int(str(x).split(os.sep)[-1].split(".")[0][1:]))
  285. for index, file in enumerate(files):
  286. file_path = os.path.join(save_path, 'F' + str(index + 1).zfill(3) + ".csv.gz")
  287. os.rename(os.path.join(save_path, file), file_path)
  288. def delete_batch_files(self):
  289. trans_print("开始删除已存在的批次文件夹")
  290. if os.path.exists(self.__get_save_path()):
  291. shutil.rmtree(self.__get_save_path())
  292. trans_print("删除已存在的批次文件夹")
  293. def delete_tmp_files(self):
  294. trans_print("开始删除临时文件夹")
  295. if os.path.exists(self.__get_excel_tmp_path()):
  296. shutil.rmtree(self.__get_excel_tmp_path())
  297. if os.path.exists(self.__get_read_tmp_path()):
  298. shutil.rmtree(self.__get_read_tmp_path())
  299. if os.path.exists(self.__get_save_tmp_path()):
  300. shutil.rmtree(self.__get_save_tmp_path())
  301. trans_print("删除临时文件夹删除成功")
  302. def delete_batch_db(self):
  303. table_name = "_".join([self.batch_no, self.trans_param.read_type])
  304. renamed_table_name = "del_" + table_name + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S')
  305. rename_table(table_name, renamed_table_name)
  306. def run(self):
  307. trans_print("开始执行", self.name, self.trans_param.read_type)
  308. self.delete_batch_files()
  309. self.delete_tmp_files()
  310. self.delete_batch_db()
  311. self.__params_valid([self.name, self.batch_no, self.field_code, self.save_path, self.trans_param.read_type,
  312. self.trans_param.read_path, 'wind_turbine_number', self.wind_full_name])
  313. # 更新运行状态到运行中
  314. update_trans_status(self.batch_no, self.trans_param.read_type, "running", "")
  315. # 开始读取数据并分类保存临时文件
  316. self.__read_file_and_save_tmp()
  317. self.mutiprocessing_to_save_file()
  318. # self.mutiprocessing_to_save_db()
  319. update_trans_status(self.batch_no, self.trans_param.read_type, "success", "",
  320. wind_count=len(read_excel_files(self.__get_read_tmp_path())))
  321. self.delete_tmp_files()