WindFarms.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/15
  3. # @Author : 魏志亮
  4. import copy
  5. import datetime
  6. import multiprocessing
  7. import sys
  8. import tempfile
  9. from base.TranseParam import TranseParam
  10. from utils.db.trans_mysql import creat_table_and_add_partition, update_trans_status, get_all_wind, \
  11. rename_table, read_excel_and_save_to_db
  12. from utils.log.trans_log import logger
  13. from utils.trans_methods import *
  14. from utils.zip.unzip import unzip, unrar
  15. class WindFarms(object):
  16. def __init__(self, name, batch_no=None, field_code=None, params: TranseParam = None, wind_full_name=None):
  17. self.name = name
  18. self.batch_no = batch_no
  19. self.field_code = field_code
  20. self.wind_full_name = wind_full_name
  21. self.begin = datetime.datetime.now()
  22. self.save_zip = False
  23. self.trans_param = params
  24. self.__exist_wind_names = set()
  25. self.wind_col_trans = get_all_wind(self.field_code)
  26. self.batch_count = 50000
  27. self.save_path = None
  28. def set_trans_param(self, params: TranseParam):
  29. self.trans_param = params
  30. read_path = str(params.read_path)
  31. if read_path.find(self.wind_full_name) == -1:
  32. message = "读取路径与配置路径不匹配:" + self.trans_param.read_path + ",配置文件为:" + self.wind_full_name
  33. update_trans_status(self.batch_no, self.trans_param.read_type, "error", message)
  34. raise ValueError(message)
  35. self.save_path = os.path.join(read_path[0:read_path.find(self.wind_full_name)], self.wind_full_name, "清理数据")
  36. def __params_valid(self, not_null_list=list()):
  37. for arg in not_null_list:
  38. if arg is None or arg == '':
  39. raise Exception("Invalid param set :" + arg)
  40. def __get_save_path(self):
  41. return os.path.join(self.save_path, self.batch_no, self.trans_param.read_type)
  42. def __get_save_tmp_path(self):
  43. return os.path.join(tempfile.gettempdir(), self.wind_full_name, self.batch_no, self.trans_param.read_type)
  44. def __get_excel_tmp_path(self):
  45. return os.path.join(self.__get_save_tmp_path(), 'excel_tmp' + os.sep)
  46. def __get_read_tmp_path(self):
  47. return os.path.join(self.__get_save_tmp_path(), 'read_tmp')
  48. def __df_save_to_tmp_file(self, df=pd.DataFrame(), file=None):
  49. if self.trans_param.is_vertical_table:
  50. pass
  51. else:
  52. # 转换字段
  53. if self.trans_param.cols_tran:
  54. cols_tran = self.trans_param.cols_tran
  55. real_cols_trans = dict()
  56. for k, v in cols_tran.items():
  57. if v and not v.startswith("$"):
  58. real_cols_trans[v] = k
  59. trans_print("包含转换字段,开始处理转换字段")
  60. df.rename(columns=real_cols_trans, inplace=True)
  61. if self.trans_param.wind_col in real_cols_trans.keys():
  62. self.trans_param.wind_col = real_cols_trans[self.trans_param.wind_col]
  63. del_keys = set(df.columns) - set(cols_tran.keys())
  64. for key in del_keys:
  65. df.drop(key, axis=1, inplace=True)
  66. df = del_blank(df, ['wind_turbine_number'])
  67. self.__save_to_tmp_csv(df, file)
  68. def __get_excel_files(self):
  69. if os.path.isfile(self.trans_param.read_path):
  70. all_files = [self.trans_param.read_path]
  71. else:
  72. all_files = read_files(self.trans_param.read_path)
  73. to_path = self.__get_excel_tmp_path()
  74. for file in all_files:
  75. if str(file).endswith("zip"):
  76. if str(file).endswith("csv.zip"):
  77. copy_to_new(file, file.replace(self.trans_param.read_path, to_path).replace("csv.zip", 'csv.gz'))
  78. else:
  79. is_success, e = unzip(file, file.replace(self.trans_param.read_path, to_path).split(".")[0])
  80. self.trans_param.has_zip = True
  81. if not is_success:
  82. raise e
  83. elif str(file).endswith("rar"):
  84. is_success, e = unrar(file, file.replace(self.trans_param.read_path, to_path).split(".")[0])
  85. self.trans_param.has_zip = True
  86. if not is_success:
  87. raise e
  88. else:
  89. copy_to_new(file, file.replace(self.trans_param.read_path, to_path))
  90. return read_excel_files(to_path)
  91. def __read_excel_to_df(self, file):
  92. read_cols = [v for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")]
  93. trans_dict = {}
  94. for k, v in self.trans_param.cols_tran.items():
  95. if v and not str(v).startswith("$"):
  96. trans_dict[v] = k
  97. if self.trans_param.is_vertical_table:
  98. vertical_cols = self.trans_param.vertical_cols
  99. df = read_file_to_df(file, vertical_cols)
  100. df = df[df[self.trans_param.vertical_key].isin(read_cols)]
  101. df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number',
  102. self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True)
  103. df[self.trans_param.vertical_key] = df[self.trans_param.vertical_key].map(trans_dict).fillna(
  104. df[self.trans_param.vertical_key])
  105. return df
  106. else:
  107. trans_dict = dict()
  108. for k, v in self.trans_param.cols_tran.items():
  109. if v and v.startswith("$"):
  110. trans_dict[v] = k
  111. if self.trans_param.merge_columns:
  112. df = read_file_to_df(file)
  113. else:
  114. if self.trans_param.need_valid_cols:
  115. df = read_file_to_df(file, read_cols)
  116. else:
  117. df = read_file_to_df(file)
  118. # 处理列名前缀问题
  119. if self.trans_param.trans_col_exec:
  120. columns_dict = dict()
  121. for column in df.columns:
  122. columns_dict[column] = eval(self.trans_param.trans_col_exec)
  123. df.rename(columns=columns_dict, inplace=True)
  124. for k, v in trans_dict.items():
  125. if k.startswith("$file"):
  126. file_name = ".".join(os.path.basename(file).split(".")[0:-1])
  127. if k == "$file":
  128. df[v] = str(file_name)
  129. else:
  130. datas = str(k.replace("$file", "").replace("[", "").replace("]", "")).split(":")
  131. if len(datas) != 2:
  132. raise Exception("字段映射出现错误 :" + str(trans_dict))
  133. df[v] = str(file_name[int(datas[0]):int(datas[1])]).strip()
  134. elif k.startswith("$folder"):
  135. folder = file
  136. cengshu = int(str(k.replace("$folder", "").replace("[", "").replace("]", "")))
  137. for i in range(cengshu):
  138. folder = os.path.dirname(folder)
  139. df[v] = str(str(folder).split(os.sep)[-1]).strip()
  140. return df
  141. def __save_to_tmp_csv(self, df, file):
  142. trans_print("开始保存", str(file), "到临时文件成功")
  143. names = set(df['wind_turbine_number'].values)
  144. for name in names:
  145. save_name = str(name) + '.csv'
  146. save_path = os.path.join(self.__get_read_tmp_path(), save_name)
  147. create_file_path(save_path, is_file_path=True)
  148. if name in self.__exist_wind_names:
  149. df[df[self.trans_param.wind_col] == name].to_csv(save_path, index=False, encoding='utf8', mode='a',
  150. header=False)
  151. else:
  152. self.__exist_wind_names.add(name)
  153. df[df[self.trans_param.wind_col] == name].to_csv(save_path, index=False, encoding='utf8')
  154. del df
  155. trans_print("保存", str(names), "到临时文件成功, 风机数量", len(names))
  156. def save_to_csv(self, filename):
  157. df = read_file_to_df(filename)
  158. if self.trans_param.is_vertical_table:
  159. df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
  160. values=self.trans_param.vertical_value,
  161. aggfunc='max')
  162. # 重置索引以得到普通的列
  163. df.reset_index(inplace=True)
  164. for k in self.trans_param.cols_tran.keys():
  165. if k not in df.columns:
  166. df[k] = None
  167. df = df[self.trans_param.cols_tran.keys()]
  168. # 添加年月日
  169. if self.trans_param.time_col:
  170. trans_print("包含时间字段,开始处理时间字段,添加年月日", filename)
  171. df[self.trans_param.time_col] = pd.to_datetime(df[self.trans_param.time_col])
  172. df['year'] = df[self.trans_param.time_col].dt.year
  173. df['month'] = df[self.trans_param.time_col].dt.month
  174. df['day'] = df[self.trans_param.time_col].dt.day
  175. df.sort_values(by=self.trans_param.time_col, inplace=True)
  176. df[self.trans_param.time_col] = df[self.trans_param.time_col].apply(
  177. lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  178. trans_print("处理时间字段结束")
  179. # 转化风机名称
  180. trans_print("开始转化风机名称")
  181. if self.trans_param.wind_name_exec:
  182. exec_str = f"df[self.trans_param.wind_col].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
  183. df[self.trans_param.wind_col] = eval(exec_str)
  184. df[self.trans_param.wind_col] = df[self.trans_param.wind_col].map(
  185. self.wind_col_trans).fillna(
  186. df[self.trans_param.wind_col])
  187. trans_print("转化风机名称结束")
  188. wind_col_name = str(df[self.trans_param.wind_col].values[0])
  189. if self.save_zip:
  190. save_path = os.path.join(self.__get_save_path(), str(wind_col_name) + '.csv.gz')
  191. else:
  192. save_path = os.path.join(self.__get_save_path(), str(wind_col_name) + '.csv')
  193. create_file_path(save_path, is_file_path=True)
  194. if self.save_zip:
  195. df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
  196. else:
  197. df.to_csv(save_path, index=False, encoding='utf-8')
  198. del df
  199. trans_print("保存" + str(filename) + ".csv成功")
  200. def read_all_files(self):
  201. # 读取文件
  202. try:
  203. all_files = self.__get_excel_files()
  204. trans_print('读取文件数量:', len(all_files))
  205. except Exception as e:
  206. logger.exception(e)
  207. message = "读取文件列表错误:" + self.trans_param.read_path + ",系统返回错误:" + str(e)
  208. update_trans_status(self.batch_no, self.trans_param.read_type, "error", message)
  209. raise e
  210. return all_files
  211. def __read_file_and_save_tmp(self):
  212. all_files = self.read_all_files()
  213. if self.trans_param.merge_columns:
  214. # with multiprocessing.Pool(6) as pool:
  215. # dfs = pool.starmap(self.__read_excel_to_df, [(file,) for file in all_files])
  216. dfs = list()
  217. index_keys = [self.trans_param.cols_tran['time_stamp']]
  218. wind_col = self.trans_param.cols_tran['wind_turbine_number']
  219. if str(wind_col).startswith("$"):
  220. wind_col = 'wind_turbine_number'
  221. index_keys.append(wind_col)
  222. df_map = dict()
  223. for file in all_files:
  224. df = self.__read_excel_to_df(file)
  225. key = '-'.join(df.columns)
  226. if key in df_map.keys():
  227. df_map[key] = pd.concat([df_map[key], df])
  228. else:
  229. df_map[key] = df
  230. for k, df in df_map.items():
  231. df.drop_duplicates(inplace=True)
  232. df.set_index(keys=index_keys, inplace=True)
  233. df = df[~df.index.duplicated(keep='first')]
  234. dfs.append(df)
  235. df = pd.concat(dfs, axis=1)
  236. df.reset_index(inplace=True)
  237. names = set(df[wind_col].values)
  238. try:
  239. for name in names:
  240. self.__df_save_to_tmp_file(df[df[wind_col] == name], "")
  241. except Exception as e:
  242. logger.exception(e)
  243. message = "合并列出现错误:" + str(e)
  244. update_trans_status(self.batch_no, self.trans_param.read_type, "error", message)
  245. raise e
  246. else:
  247. for file in all_files:
  248. try:
  249. self.__df_save_to_tmp_file(self.__read_excel_to_df(file), file)
  250. except Exception as e:
  251. logger.exception(e)
  252. message = "读取文件错误:" + file + ",系统返回错误:" + str(e)
  253. update_trans_status(self.batch_no, self.trans_param.read_type, "error", message)
  254. raise e
  255. def mutiprocessing_to_save_file(self):
  256. # 开始保存到正式文件
  257. trans_print("开始保存到excel文件")
  258. all_tmp_files = read_excel_files(self.__get_read_tmp_path())
  259. try:
  260. with multiprocessing.Pool(6) as pool:
  261. pool.starmap(self.save_to_csv, [(file,) for file in all_tmp_files])
  262. except Exception as e:
  263. logger.exception(e)
  264. message = "保存文件错误,系统返回错误:" + str(e)
  265. update_trans_status(self.batch_no, self.trans_param.read_type, "error", message)
  266. raise e
  267. trans_print("结束保存到excel文件")
  268. def mutiprocessing_to_save_db(self):
  269. # 开始保存到SQL文件
  270. trans_print("开始保存到数据库文件")
  271. all_saved_files = read_excel_files(self.__get_save_path())
  272. table_name = self.batch_no + "_" + self.trans_param.read_type
  273. creat_table_and_add_partition(table_name, len(all_saved_files), self.trans_param.read_type)
  274. try:
  275. with multiprocessing.Pool(6) as pool:
  276. pool.starmap(read_excel_and_save_to_db,
  277. [(table_name, file, self.batch_count) for file in all_saved_files])
  278. except Exception as e:
  279. logger.exception(e)
  280. message = "保存到数据库错误,系统返回错误:" + str(e)
  281. update_trans_status(self.batch_no, self.trans_param.read_type, "error", message)
  282. raise e
  283. trans_print("结束保存到数据库文件")
  284. def __rename_file(self):
  285. save_path = self.__get_save_path()
  286. files = os.listdir(save_path)
  287. files.sort(key=lambda x: int(str(x).split(os.sep)[-1].split(".")[0][1:]))
  288. for index, file in enumerate(files):
  289. file_path = os.path.join(save_path, 'F' + str(index + 1).zfill(3) + ".csv.gz")
  290. os.rename(os.path.join(save_path, file), file_path)
  291. def delete_batch_files(self):
  292. trans_print("开始删除已存在的批次文件夹")
  293. if os.path.exists(self.__get_save_path()):
  294. shutil.rmtree(self.__get_save_path())
  295. trans_print("删除已存在的批次文件夹")
  296. def delete_tmp_files(self):
  297. trans_print("开始删除临时文件夹")
  298. if os.path.exists(self.__get_excel_tmp_path()):
  299. shutil.rmtree(self.__get_excel_tmp_path())
  300. if os.path.exists(self.__get_read_tmp_path()):
  301. shutil.rmtree(self.__get_read_tmp_path())
  302. if os.path.exists(self.__get_save_tmp_path()):
  303. shutil.rmtree(self.__get_save_tmp_path())
  304. trans_print("删除临时文件夹删除成功")
  305. def delete_batch_db(self):
  306. table_name = "_".join([self.batch_no, self.trans_param.read_type])
  307. renamed_table_name = "del_" + table_name + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S')
  308. rename_table(table_name, renamed_table_name)
  309. def run(self):
  310. trans_print("开始执行", self.name, self.trans_param.read_type)
  311. self.delete_batch_files()
  312. self.delete_tmp_files()
  313. self.delete_batch_db()
  314. self.__params_valid([self.name, self.batch_no, self.field_code, self.save_path, self.trans_param.read_type,
  315. self.trans_param.read_path,
  316. self.trans_param.time_col, self.trans_param.wind_col, self.wind_full_name])
  317. # 更新运行状态到运行中
  318. update_trans_status(self.batch_no, self.trans_param.read_type, "running", "")
  319. # 开始读取数据并分类保存临时文件
  320. self.__read_file_and_save_tmp()
  321. self.mutiprocessing_to_save_file()
  322. self.mutiprocessing_to_save_db()
  323. update_trans_status(self.batch_no, self.trans_param.read_type, "success", "",
  324. wind_count=len(read_excel_files(self.__get_read_tmp_path())))
  325. self.delete_tmp_files()