WindFarms.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2024/5/15
  3. # @Author : 魏志亮
  4. import datetime
  5. import multiprocessing
  6. import tempfile
  7. import traceback
  8. from etl.base.TranseParam import TranseParam
  9. from service.plt_service import get_all_wind, update_trans_status_error, update_trans_status_running, \
  10. update_trans_status_success, update_trans_transfer_progress
  11. from service.trans_service import creat_table_and_add_partition, save_file_to_db, drop_table
  12. from utils.df_utils.util import get_time_space
  13. from utils.file.trans_methods import *
  14. from utils.zip.unzip import unzip, unrar, get_desc_path
  15. class WindFarms(object):
  16. def __init__(self, batch_no=None, batch_name=None, field_code=None, params: TranseParam = None, wind_full_name=None,
  17. save_db=True, header=0):
  18. self.batch_no = batch_no
  19. self.batch_name = batch_name
  20. self.field_code = field_code
  21. self.wind_full_name = wind_full_name
  22. self.save_zip = False
  23. self.trans_param = params
  24. self.exist_wind_names = multiprocessing.Manager().list()
  25. self.wind_col_trans = get_all_wind(self.field_code)
  26. self.batch_count = 200000
  27. self.save_path = None
  28. self.save_db = save_db
  29. self.lock = multiprocessing.Manager().Lock()
  30. self.statistics_map = multiprocessing.Manager().dict()
  31. self.header = header
  32. def set_trans_param(self, params: TranseParam):
  33. self.trans_param = params
  34. read_path = str(params.read_path)
  35. if read_path.find(self.wind_full_name) == -1:
  36. message = "读取路径与配置路径不匹配:" + self.trans_param.read_path + ",配置文件为:" + self.wind_full_name
  37. update_trans_status_error(self.batch_no, self.trans_param.read_type, message, self.save_db)
  38. raise ValueError(message)
  39. self.save_path = os.path.join(read_path[0:read_path.find(self.wind_full_name)], self.wind_full_name, "清理数据")
  40. def params_valid(self, not_null_list=list()):
  41. for arg in not_null_list:
  42. if arg is None or arg == '':
  43. raise Exception("Invalid param set :" + arg)
  44. def get_save_path(self):
  45. return os.path.join(self.save_path, self.batch_no + "_" + self.batch_name, self.trans_param.read_type)
  46. def get_save_tmp_path(self):
  47. return os.path.join(tempfile.gettempdir(), self.wind_full_name, self.batch_no + "_" + self.batch_name,
  48. self.trans_param.read_type)
  49. def get_excel_tmp_path(self):
  50. return os.path.join(self.get_save_tmp_path(), 'excel_tmp' + os.sep)
  51. def get_read_tmp_path(self):
  52. return os.path.join(self.get_save_tmp_path(), 'read_tmp')
  53. def df_save_to_tmp_file(self, df=pd.DataFrame(), file=None):
  54. if self.trans_param.is_vertical_table:
  55. pass
  56. else:
  57. # 转换字段
  58. if self.trans_param.cols_tran:
  59. cols_tran = self.trans_param.cols_tran
  60. real_cols_trans = dict()
  61. for k, v in cols_tran.items():
  62. if v and not v.startswith("$"):
  63. real_cols_trans[v] = k
  64. trans_print("包含转换字段,开始处理转换字段")
  65. df.rename(columns=real_cols_trans, inplace=True)
  66. del_keys = set(df.columns) - set(cols_tran.keys())
  67. for key in del_keys:
  68. df.drop(key, axis=1, inplace=True)
  69. df = del_blank(df, ['wind_turbine_number'])
  70. df = df[df['time_stamp'].isna() == False]
  71. if self.trans_param.wind_name_exec:
  72. exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
  73. df['wind_turbine_number'] = eval(exec_str)
  74. self.save_to_tmp_csv(df, file)
  75. def get_and_remove(self, file, thead_local=None):
  76. to_path = self.get_excel_tmp_path()
  77. if str(file).endswith("zip"):
  78. if str(file).endswith("csv.zip"):
  79. copy_to_new(file, file.replace(self.trans_param.read_path, to_path).replace("csv.zip", 'csv.gz'))
  80. else:
  81. desc_path = file.replace(self.trans_param.read_path, to_path)
  82. is_success, e = unzip(file, get_desc_path(desc_path))
  83. self.trans_param.has_zip = True
  84. if not is_success:
  85. # raise e
  86. pass
  87. elif str(file).endswith("rar"):
  88. desc_path = file.replace(self.trans_param.read_path, to_path)
  89. is_success, e = unrar(file, get_desc_path(desc_path))
  90. self.trans_param.has_zip = True
  91. if not is_success:
  92. # raise e
  93. pass
  94. else:
  95. copy_to_new(file, file.replace(self.trans_param.read_path, to_path))
  96. def read_excel_to_df(self, file_path):
  97. read_cols = [v.split(",")[0] for k, v in self.trans_param.cols_tran.items() if v and not v.startswith("$")]
  98. trans_dict = {}
  99. for k, v in self.trans_param.cols_tran.items():
  100. if v and not str(v).startswith("$"):
  101. trans_dict[v] = k
  102. if self.trans_param.is_vertical_table:
  103. vertical_cols = self.trans_param.vertical_cols
  104. df = read_file_to_df(file_path, vertical_cols, header=self.header)
  105. df = df[df[self.trans_param.vertical_key].isin(read_cols)]
  106. df.rename(columns={self.trans_param.cols_tran['wind_turbine_number']: 'wind_turbine_number',
  107. self.trans_param.cols_tran['time_stamp']: 'time_stamp'}, inplace=True)
  108. df[self.trans_param.vertical_key] = df[self.trans_param.vertical_key].map(trans_dict).fillna(
  109. df[self.trans_param.vertical_key])
  110. return df
  111. else:
  112. trans_dict = dict()
  113. for k, v in self.trans_param.cols_tran.items():
  114. if v and v.startswith("$") or v.find(",") > 0:
  115. trans_dict[v] = k
  116. if self.trans_param.merge_columns:
  117. df = read_file_to_df(file_path, header=self.header)
  118. else:
  119. if self.trans_param.need_valid_cols:
  120. df = read_file_to_df(file_path, read_cols, header=self.header)
  121. else:
  122. df = read_file_to_df(file_path, header=self.header)
  123. # 处理列名前缀问题
  124. if self.trans_param.resolve_col_prefix:
  125. columns_dict = dict()
  126. for column in df.columns:
  127. columns_dict[column] = eval(self.trans_param.resolve_col_prefix)
  128. df.rename(columns=columns_dict, inplace=True)
  129. for k, v in trans_dict.items():
  130. if k.startswith("$file"):
  131. file = ".".join(os.path.basename(file_path).split(".")[0:-1])
  132. if k == "$file":
  133. df[v] = str(file)
  134. elif k.startswith("$file["):
  135. datas = str(k.replace("$file", "").replace("[", "").replace("]", "")).split(":")
  136. if len(datas) != 2:
  137. raise Exception("字段映射出现错误 :" + str(trans_dict))
  138. df[v] = str(file[int(datas[0]):int(datas[1])]).strip()
  139. elif k.find("$file_date") > 0:
  140. datas = str(k.split(",")[1].replace("$file_date", "").replace("[", "").replace("]", "")).split(":")
  141. if len(datas) != 2:
  142. raise Exception("字段映射出现错误 :" + str(trans_dict))
  143. date_str = str(file[int(datas[0]):int(datas[1])]).strip()
  144. df[v] = df[k.split(",")[0]].apply(lambda x: date_str + " " + str(x))
  145. elif k.startswith("$folder"):
  146. folder = file_path
  147. cengshu = int(str(k.replace("$folder", "").replace("[", "").replace("]", "")))
  148. for i in range(cengshu):
  149. folder = os.path.dirname(folder)
  150. df[v] = str(str(folder).split(os.sep)[-1]).strip()
  151. return df
  152. def _save_to_tmp_csv_by_name(self, df, name):
  153. save_name = str(name) + '.csv'
  154. save_path = os.path.join(self.get_read_tmp_path(), save_name)
  155. create_file_path(save_path, is_file_path=True)
  156. with self.lock:
  157. if name in self.exist_wind_names:
  158. contains_name = True
  159. else:
  160. contains_name = False
  161. self.exist_wind_names.append(name)
  162. if contains_name:
  163. df.to_csv(save_path, index=False, encoding='utf8', mode='a',
  164. header=False)
  165. else:
  166. df.to_csv(save_path, index=False, encoding='utf8')
  167. def save_to_tmp_csv(self, df, file):
  168. trans_print("开始保存", str(file), "到临时文件")
  169. names = set(df['wind_turbine_number'].values)
  170. with multiprocessing.Pool(6) as pool:
  171. pool.starmap(self._save_to_tmp_csv_by_name,
  172. [(df[df['wind_turbine_number'] == name], name) for name in names])
  173. del df
  174. trans_print("保存", str(names), "到临时文件成功, 风机数量", len(names))
  175. def set_statistics_data(self, df):
  176. if not df.empty:
  177. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  178. min_date = df['time_stamp'].min()
  179. max_date = df['time_stamp'].max()
  180. with self.lock:
  181. if 'min_date' in self.statistics_map.keys():
  182. if self.statistics_map['min_date'] > min_date:
  183. self.statistics_map['min_date'] = min_date
  184. else:
  185. self.statistics_map['min_date'] = min_date
  186. if 'max_date' in self.statistics_map.keys():
  187. if self.statistics_map['max_date'] < max_date:
  188. self.statistics_map['max_date'] = max_date
  189. else:
  190. self.statistics_map['max_date'] = max_date
  191. if 'total_count' in self.statistics_map.keys():
  192. self.statistics_map['total_count'] = self.statistics_map['total_count'] + df.shape[0]
  193. else:
  194. self.statistics_map['total_count'] = df.shape[0]
  195. if 'time_granularity' not in self.statistics_map.keys():
  196. self.statistics_map['time_granularity'] = get_time_space(df, 'time_stamp')
  197. def save_statistics_file(self):
  198. save_path = os.path.join(os.path.dirname(self.get_save_path()),
  199. self.trans_param.read_type + '_statistics.txt')
  200. create_file_path(save_path, is_file_path=True)
  201. with open(save_path, 'w', encoding='utf8') as f:
  202. f.write("总数据量:" + str(self.statistics_map['total_count']) + "\n")
  203. f.write("最小时间:" + str(self.statistics_map['min_date']) + "\n")
  204. f.write("最大时间:" + str(self.statistics_map['max_date']) + "\n")
  205. f.write("风机数量:" + str(len(read_excel_files(self.get_read_tmp_path()))) + "\n")
  206. def save_to_csv(self, filename):
  207. df = read_file_to_df(filename)
  208. if self.trans_param.is_vertical_table:
  209. df = df.pivot_table(index=['time_stamp', 'wind_turbine_number'], columns=self.trans_param.vertical_key,
  210. values=self.trans_param.vertical_value,
  211. aggfunc='max')
  212. # 重置索引以得到普通的列
  213. df.reset_index(inplace=True)
  214. for k in self.trans_param.cols_tran.keys():
  215. if k not in df.columns:
  216. df[k] = None
  217. df = df[self.trans_param.cols_tran.keys()]
  218. # 转化风机名称
  219. trans_print("开始转化风机名称")
  220. # if self.trans_param.wind_name_exec:
  221. # exec_str = f"df['wind_turbine_number'].apply(lambda wind_name: {self.trans_param.wind_name_exec} )"
  222. # df['wind_turbine_number'] = eval(exec_str)
  223. df['wind_turbine_number'] = df['wind_turbine_number'].astype('str')
  224. df['wind_turbine_name'] = df['wind_turbine_number']
  225. df['wind_turbine_number'] = df['wind_turbine_number'].map(
  226. self.wind_col_trans).fillna(
  227. df['wind_turbine_number'])
  228. wind_col_name = str(df['wind_turbine_number'].values[0])
  229. # 添加年月日
  230. trans_print(wind_col_name, "包含时间字段,开始处理时间字段,添加年月日", filename)
  231. trans_print(wind_col_name, "时间原始大小:", df.shape[0])
  232. df = df[(df['time_stamp'].str.find('-') > 0) & (df['time_stamp'].str.find(':') > 0)]
  233. trans_print(wind_col_name, "去掉非法时间后大小:", df.shape[0])
  234. df['time_stamp'] = pd.to_datetime(df['time_stamp'])
  235. df['year'] = df['time_stamp'].dt.year
  236. df['month'] = df['time_stamp'].dt.month
  237. df['day'] = df['time_stamp'].dt.day
  238. df.sort_values(by='time_stamp', inplace=True)
  239. df['time_stamp'] = df['time_stamp'].apply(
  240. lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
  241. trans_print("处理时间字段结束")
  242. # 如果包含*号,祛除
  243. trans_print(wind_col_name, "过滤星号前大小:", df.shape[0])
  244. mask = ~df.applymap(lambda x: isinstance(x, str) and '*' in x).any(axis=1)
  245. df = df[mask]
  246. trans_print(wind_col_name, "过滤星号后大小:", df.shape[0])
  247. trans_print(wind_col_name, "转化风机名称结束")
  248. if self.save_zip:
  249. save_path = os.path.join(self.get_save_path(), str(wind_col_name) + '.csv.gz')
  250. else:
  251. save_path = os.path.join(self.get_save_path(), str(wind_col_name) + '.csv')
  252. create_file_path(save_path, is_file_path=True)
  253. if self.save_zip:
  254. df.to_csv(save_path, compression='gzip', index=False, encoding='utf-8')
  255. else:
  256. df.to_csv(save_path, index=False, encoding='utf-8')
  257. self.set_statistics_data(df)
  258. del df
  259. trans_print("保存" + str(filename) + ".csv成功")
  260. def remove_file_to_tmp_path(self):
  261. # 读取文件
  262. try:
  263. if os.path.isfile(self.trans_param.read_path):
  264. all_files = [self.trans_param.read_path]
  265. else:
  266. all_files = read_files(self.trans_param.read_path)
  267. with multiprocessing.Pool(6) as pool:
  268. pool.starmap(self.get_and_remove, [(i,) for i in all_files])
  269. all_files = read_excel_files(self.get_excel_tmp_path())
  270. trans_print('读取文件数量:', len(all_files))
  271. except Exception as e:
  272. trans_print(traceback.format_exc())
  273. message = "读取文件列表错误:" + self.trans_param.read_path + ",系统返回错误:" + str(e)
  274. raise ValueError(message)
  275. return all_files
  276. def read_file_and_save_tmp(self):
  277. all_files = read_excel_files(self.get_excel_tmp_path())
  278. if self.trans_param.merge_columns:
  279. dfs_list = list()
  280. index_keys = [self.trans_param.cols_tran['time_stamp']]
  281. wind_col = self.trans_param.cols_tran['wind_turbine_number']
  282. if str(wind_col).startswith("$"):
  283. wind_col = 'wind_turbine_number'
  284. index_keys.append(wind_col)
  285. df_map = dict()
  286. with multiprocessing.Pool(6) as pool:
  287. dfs = pool.starmap(self.read_excel_to_df, [(file,) for file in all_files])
  288. for df in dfs:
  289. key = '-'.join(df.columns)
  290. if key in df_map.keys():
  291. df_map[key] = pd.concat([df_map[key], df])
  292. else:
  293. df_map[key] = df
  294. for k, df in df_map.items():
  295. df.drop_duplicates(inplace=True)
  296. df.set_index(keys=index_keys, inplace=True)
  297. df = df[~df.index.duplicated(keep='first')]
  298. dfs_list.append(df)
  299. df = pd.concat(dfs_list, axis=1)
  300. df.reset_index(inplace=True)
  301. try:
  302. self.df_save_to_tmp_file(df, "")
  303. except Exception as e:
  304. trans_print(traceback.format_exc())
  305. message = "合并列出现错误:" + str(e)
  306. raise ValueError(message)
  307. else:
  308. split_count = 6
  309. all_arrays = split_array(all_files, split_count)
  310. for arr in all_arrays:
  311. with multiprocessing.Pool(split_count) as pool:
  312. dfs = pool.starmap(self.read_excel_to_df, [(ar,) for ar in arr])
  313. try:
  314. for df in dfs:
  315. self.df_save_to_tmp_file(df)
  316. except Exception as e:
  317. trans_print(traceback.format_exc())
  318. message = "整理临时文件,系统返回错误:" + str(e)
  319. raise ValueError(message)
  320. def mutiprocessing_to_save_file(self):
  321. # 开始保存到正式文件
  322. trans_print("开始保存到excel文件")
  323. all_tmp_files = read_excel_files(self.get_read_tmp_path())
  324. try:
  325. with multiprocessing.Pool(6) as pool:
  326. pool.starmap(self.save_to_csv, [(file,) for file in all_tmp_files])
  327. except Exception as e:
  328. trans_print(traceback.format_exc())
  329. message = "保存文件错误,系统返回错误:" + str(e)
  330. raise ValueError(message)
  331. trans_print("结束保存到excel文件")
  332. def mutiprocessing_to_save_db(self):
  333. # 开始保存到SQL文件
  334. trans_print("开始保存到数据库文件")
  335. all_saved_files = read_excel_files(self.get_save_path())
  336. table_name = self.batch_no + "_" + self.trans_param.read_type
  337. creat_table_and_add_partition(table_name, len(all_saved_files), self.trans_param.read_type)
  338. try:
  339. with multiprocessing.Pool(6) as pool:
  340. pool.starmap(save_file_to_db,
  341. [(table_name, file, self.batch_count) for file in all_saved_files])
  342. except Exception as e:
  343. trans_print(traceback.format_exc())
  344. message = "保存到数据库错误,系统返回错误:" + str(e)
  345. raise ValueError(message)
  346. trans_print("结束保存到数据库文件")
  347. def _rename_file(self):
  348. save_path = self.get_save_path()
  349. files = os.listdir(save_path)
  350. files.sort(key=lambda x: int(str(x).split(os.sep)[-1].split(".")[0][1:]))
  351. for index, file in enumerate(files):
  352. file_path = os.path.join(save_path, 'F' + str(index + 1).zfill(3) + ".csv.gz")
  353. os.rename(os.path.join(save_path, file), file_path)
  354. def delete_batch_files(self):
  355. trans_print("开始删除已存在的批次文件夹")
  356. if os.path.exists(self.get_save_path()):
  357. shutil.rmtree(self.get_save_path())
  358. trans_print("删除已存在的批次文件夹")
  359. def delete_tmp_files(self):
  360. trans_print("开始删除临时文件夹")
  361. if os.path.exists(self.get_save_tmp_path()):
  362. shutil.rmtree(self.get_save_tmp_path())
  363. trans_print("删除临时文件夹删除成功")
  364. def delete_batch_db(self):
  365. if self.save_db:
  366. table_name = "_".join([self.batch_no, self.trans_param.read_type])
  367. renamed_table_name = "del_" + table_name + "_" + datetime.datetime.now().strftime('%Y%m%d%H%M%S')
  368. # rename_table(table_name, renamed_table_name, self.save_db)
  369. drop_table(table_name, self.save_db)
  370. def run(self, step=0, end=3):
  371. begin = datetime.datetime.now()
  372. trans_print("开始执行")
  373. update_trans_status_running(self.batch_no, self.trans_param.read_type, self.save_db)
  374. if step <= 0 and end >= 0:
  375. tmp_begin = datetime.datetime.now()
  376. trans_print("开始初始化字段")
  377. self.delete_batch_files()
  378. self.delete_batch_db()
  379. self.params_valid([self.batch_no, self.field_code, self.save_path, self.trans_param.read_type,
  380. self.trans_param.read_path, self.wind_full_name])
  381. # if self.trans_param.resolve_col_prefix:
  382. # column = "测试"
  383. # eval(self.trans_param.resolve_col_prefix)
  384. #
  385. # if self.trans_param.wind_name_exec:
  386. # wind_name = "测试"
  387. # eval(self.trans_param.wind_name_exec)
  388. update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 5, self.save_db)
  389. trans_print("初始化字段结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
  390. str(datetime.datetime.now() - begin))
  391. if step <= 1 and end >= 1:
  392. # 更新运行状态到运行中
  393. tmp_begin = datetime.datetime.now()
  394. self.delete_tmp_files()
  395. trans_print("开始保存到临时路径")
  396. # 开始读取数据并分类保存临时文件
  397. self.remove_file_to_tmp_path()
  398. update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 20, self.save_db)
  399. trans_print("保存到临时路径结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
  400. str(datetime.datetime.now() - begin))
  401. if step <= 2 and end >= 2:
  402. # 更新运行状态到运行中
  403. tmp_begin = datetime.datetime.now()
  404. trans_print("开始保存到临时文件")
  405. # 开始读取数据并分类保存临时文件
  406. self.read_file_and_save_tmp()
  407. update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 50, self.save_db)
  408. trans_print("保存到临时文件结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
  409. str(datetime.datetime.now() - begin))
  410. if step <= 3 and end >= 3:
  411. tmp_begin = datetime.datetime.now()
  412. trans_print("开始保存到文件")
  413. self.mutiprocessing_to_save_file()
  414. self.save_statistics_file()
  415. update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 70, self.save_db)
  416. trans_print("保存到文件结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
  417. str(datetime.datetime.now() - begin))
  418. if step <= 4 and end >= 4:
  419. if self.save_db:
  420. trans_print("开始保存到数据库")
  421. tmp_begin = datetime.datetime.now()
  422. self.mutiprocessing_to_save_db()
  423. trans_print("保存到数据库结束,耗时:", str(datetime.datetime.now() - tmp_begin), ",总耗时:",
  424. str(datetime.datetime.now() - begin))
  425. update_trans_transfer_progress(self.batch_no, self.trans_param.read_type, 100, self.save_db)
  426. # 如果end==0 则说明只是进行了验证
  427. if end != 0:
  428. update_trans_status_success(self.batch_no, self.trans_param.read_type,
  429. len(read_excel_files(self.get_read_tmp_path())),
  430. self.statistics_map['time_granularity'], self.save_db)
  431. trans_print("结束执行", self.trans_param.read_type, ",总耗时:",
  432. str(datetime.datetime.now() - begin))